高并发第十单:J.U.C AQS(AbstractQueuedSynchronizer) 组件:CountDownLatch. CyclicBarrier .Semaphore

时间:2021-08-12 21:20:18

这里有一篇介绍AQS的文章 非常好: Java并发之AQS详解

AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向列表。

AQS主要利用硬件原语指令(CAS compare-and-swap),来实现轻量级多线程同步机制,并且不会引起CPU上文切换和调度,同时提供内存可见性和原子化更新保证(线程安全的三要素:原子性、可见性、顺序性)。

AQS的本质上是一个同步器/阻塞锁的基础框架,其作用主要是提供加锁、释放锁,并在内部维护一个FIFO等待队列,用于存储由于锁竞争而阻塞的线程。

高并发第十单:J.U.C  AQS(AbstractQueuedSynchronizer) 组件:CountDownLatch. CyclicBarrier .Semaphore

Sync queue:同步队列,是一个双向列表。包括head节点和tail节点。head节点主要用作后续的调度。

Condition queue:非必须,单向列表。当程序中存在cindition的时候才会存在此列表。

高并发第十单:J.U.C  AQS(AbstractQueuedSynchronizer) 组件:CountDownLatch. CyclicBarrier .Semaphore

AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向列表。

 它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

这里就不详细去说AQS了.因为开头的那个文章已经说得很清楚了.介绍以下他的实现类吧

1.CountDownLatch(计数器)

详解Java CountDownLatch源码解析(上)    Java CountDownLatch源码解析(下)

CountDownLatch是在java1.5被引入的,它都存在于java.util.concurrent包下。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

高并发第十单:J.U.C  AQS(AbstractQueuedSynchronizer) 组件:CountDownLatch. CyclicBarrier .Semaphore

主要在实时系统中的使用场景

  1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
  2. 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
  3. 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
  4. 有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行 (最常用的)

构造方法:

构造一个用给定计数初始化的 CountDownLatch(int count)

普通方法:

void await()            使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。

boolean await(long timeout, TimeUnit unit)            可以设置等待的时间,如果超过此时间,计数器还未清零,则不继续等待

void countDown()            递减锁存器的计数,如果计数到达零,则释放所有等待的线程

long getCount()            返回当前计数

看个例子

public class CountDownLatchDemo {

    private static final int THREAD_COUNT_NUM = 6;
private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 6; i++) {
int index = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "第" + index + "个任务完成!"
+ Thread.currentThread().getName());
// 模拟完成一个任务,随机模拟不同的寻找时间
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 每完成一个任务,需要等待的任务数减1
// countDownLatch.countDown(); }, "我是线程:" + i + ":").start();
;
}
// 等待检查,即上述7个线程执行完毕之后,执行await后边的代码
// countDownLatch.await();
System.out.println("所有任务完成!" + System.currentTimeMillis()); }
} 结果:

  

所有任务完成!1537617901498
我是线程:3:第3个任务完成!我是线程:3:
我是线程:0:第0个任务完成!我是线程:0:
我是线程:1:第1个任务完成!我是线程:1:
我是线程:2:第2个任务完成!我是线程:2:
我是线程:4:第4个任务完成!我是线程:4:
我是线程:5:第5个任务完成!我是线程:5:

我是线程:2:完成:1537617904499
我是线程:1:完成:1537617904499
我是线程:0:完成:1537617904499
我是线程:3:完成:1537617904499
我是线程:4:完成:1537617904499
我是线程:5:完成:1537617904499

 

加上:

public class CountDownLatchDemo {

    private static final int THREAD_COUNT_NUM = 6;
private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 6; i++) {
int index = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "第" + index + "个任务完成!"
+ Thread.currentThread().getName());
// 模拟完成第i个任务,
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 每完成一个任务,需要等待的任务数减1
countDownLatch.countDown(); }, "我是线程:" + i + ":").start();
;
}
// 等待检查,即上述7个线程执行完毕之后,执行await后边的代码
countDownLatch.await();
System.out.println("所有任务完成!" + System.currentTimeMillis()); }
}
结果:

我是线程:2:第2个任务完成!我是线程:2:
我是线程:1:第1个任务完成!我是线程:1:
我是线程:0:第0个任务完成!我是线程:0:
我是线程:3:第3个任务完成!我是线程:3:
我是线程:4:第4个任务完成!我是线程:4:
我是线程:5:第5个任务完成!我是线程:5:
我是线程:1:完成:1537617977385
我是线程:3:完成:1537617977385
我是线程:4:完成:1537617977385
我是线程:0:完成:1537617977385
我是线程:2:完成:1537617977385
我是线程:5:完成:1537617977387
所有任务完成!1537617977388

顺序有可能发生变化.但是  所有任务完成!1537617977388 时间肯定在他们之后,速度快最多一样.肯定不会比他们小

这里有个例子 线程数 大于 锁住数时 会发生什么呢.

public class CountDownLatchDemo {

    private static final int THREAD_COUNT_NUM = 8;
private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "第" + index + "个任务完成!"
+ Thread.currentThread().getName());
// 模拟收集第i个龙珠,随机模拟不同的寻找时间
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 每收集到一颗龙珠,需要等待的颗数减1
countDownLatch.countDown(); }, "我是线程:" + i + ":").start();
;
}
// 等待检查,即上述7个线程执行完毕之后,执行await后边的代码
countDownLatch.await();
System.out.println("所有任务完成!" + System.currentTimeMillis()); }
} 结果
...............
所有任务完成!1537618986091
............
我是线程:39:完成:1537618986097
我是线程:37:完成:1537618986097
我是线程:38:完成:1537618986097
我是线程:35:完成:1537618986096

所以更加证明了.

CountDownLatch(THREAD_COUNT_NUM); 最多锁住 THREAD_COUNT_NUM 个的线程,其他的线程就按原来的顺序运行了

这个就直接证明了  在await()处,让所有的任务完成了 才能继续主线程

优点:

CountDownLatch的优点毋庸置疑,对使用者而言,你只需要传入一个int型变量控制任务数量即可,至于同步队列的出队入队维护,state变量值的维护对使用者都是透明的,使用方便。

缺点:

CountDownLatch设置了state后就不能更改,也不能循环使用。

2.CyclicBarrier

高并发第十单:J.U.C  AQS(AbstractQueuedSynchronizer) 组件:CountDownLatch. CyclicBarrier .Semaphore

既然说了 CountDownLatch设置了state后就不能更改,也不能循环使用。那就来个可以循环使用的

举个例子:有四个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通过。其实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等到其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要相互等待。这和CountDownLatch的应用场景有区别,CountDownLatch里的线程是到了运行的目标后继续干自己的其他事情,而这里的线程需要等待其他线程后才能继续完成下面的工作。

案例一: 一起等待

public class CyclicBarrierDemo {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
int index = i;
new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println(String.format("我是第%s启动了", index));
barrier.await();
System.out.println(String.format("我是第%s完成了", index));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} }, "我是第" + index + "个线程:").start();
} }
} 结果:
我是第6启动了
.......
我是第0启动了
.......
我是第4完成了了

全部启动,然后一起等待,再继续完成任务

//案例二 最多等待时间

    private static void test2() {
for (int i = 0; i < 10; i++) {
int index = i;
new Thread(() -> {
try {
System.out.println(String.format("我是第%s启动了", index));
// 最多阻塞时间
barrier.await(2000, TimeUnit.MILLISECONDS);
System.out.println(String.format("我是第%s完成了", index));
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
e.printStackTrace();
barrier.reset();
} }).start();
} }

// 还有一个额外的方法是  构造是可以多构造一个Runnable,在计数器的值到达设定值后(但在释放所有线程之前),该Runnable运行一次,注,Runnable在每个屏障点只运行一个

private static CyclicBarrier barrier = new CyclicBarrier(1,()->{
System.out.println("优先执行我");
}); for (int i = 0; i < 2; i++) {
int index = i;
new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println(String.format("我是第%s启动了", index));
barrier.await();
System.out.println(String.format("我是第%s完成了", index));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} }, "我是第" + index + "个线程:").start();
} 结果是:
我是第1启动了
我是第0启动了
优先执行自己
优先执行自己
我是第1完成了
我是第0完成了
CyclicBarrier 和 CountDownLatch的比较:
  • CountDownLatch: 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。--> 反正 你执行完  就ok.不能随意放开
  • CyclicBarrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。--> 可以到到某个条件.我放开就行了
  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CountDownLatch:减计数方式,CyclicBarrier:加计数方式

3. Semaphore

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。

比喻:

  Semaphore是一件可以容纳N人的房间,如果人不满就可以进去,如果人满了,就要等待有人出来。对于N=1的情况,称为binary semaphore。一般的用法是,用于限制对于某一资源的同时访问

官方一点就是:

用于保证同一时间并发访问线程的数目。

信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。

Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。 在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。

使用场景:仅能提供有限访问的资源。比如数据库连接

上例子:

// 方式 一 直接获取

// 给出10个资源 ,最多保证10个并发
private static final Semaphore SEMAPHORE = new Semaphore(10);
.......
for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
SEMAPHORE.acquire();// 获取一个许可
System.out.println(String.format("我是线程:%s", index));// 需要并发控制的内容
Thread.sleep(3000);
SEMAPHORE.release(); // 释放一个许可
} catch (InterruptedException e) {
e.printStackTrace();
} }).start();
} //结果:
很明显的能看到 10个10的执行

// 方式 二 尝试获取许可,获取不到不执行  很多时候相当于只执行设置的并发量一次

private static final Semaphore SEMAPHORE = new Semaphore(10);

for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
// 尝试获取许可,获取不到不执行
if(SEMAPHORE.tryAcquire() {
System.out.println(String.format("我是线程:%s", index));// 需要并发控制的内容
Thread.sleep(3000);
SEMAPHORE.release(); // 释放一个许可
} } catch (InterruptedException e) {
e.printStackTrace();
} }).start();

// 当时三  有个最长申请时间

private static final Semaphore SEMAPHORE = new Semaphore(10);
for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
// 尝试获取许可,获取不到不执行 最长申请时间
if(SEMAPHORE.tryAcquire(5000,TimeUnit.MILLISECONDS)) {
System.out.println(String.format("我是线程:%s", index));// 需要并发控制的内容
Thread.sleep(3000);
SEMAPHORE.release(); // 释放一个许可
} } catch (InterruptedException e) {
e.printStackTrace();
} }).start();
}

注意:

1 . 其中  构造方法可以加公平锁   :private static final Semaphore SEMAPHORE = new Semaphore(100,true);

2. SEMAPHORE.tryAcquire()  => 可以增加获取条件量 SEMAPHORE.tryAcquire(10);释放 SEMAPHORE.release(10);