Java并发编程之CountDownLatch

时间:2022-05-14 20:51:48

CyclicBarrier所描述的是“允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务”。而CountDownlatch和它也有一点点相似之处:CountDownlatch所描述的是“在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待”。在JDK API中是这样阐述的:

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier

CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await

虽然,CountDownlatchCyclicBarrier有那么点相似,但是他们还是存在一些区别的:

1、CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。

2、 CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier



Java并发编程之CountDownLatch


CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。


                                                                      Java并发编程之CountDownLatch


Java并发编程之CountDownLatch


CountDownLatch结构如下:

Java并发编程之CountDownLatch

从上图中可以看出CountDownLatch依赖Sync,其实CountDownLatch内部采用的是共享锁来实现的(内部Sync的实现可以看出)。它的构造函数如下:

CountDownLatch(int count):构造一个用给定计数初始化的 CountDownLatch

 
  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }
以下源代码可以证明, CountDownLatch 内部是采用共享锁来实现的:

 
  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 4982264981922014374L;
  3.  
  4. Sync(int count) {
  5. setState(count);
  6. }
  7.  
  8. int getCount() {
  9. return getState();
  10. }
  11.  
  12. protected int tryAcquireShared(int acquires) {
  13. return (getState() == 0) ? 1 : -1;
  14. }
  15.  
  16. protected boolean tryReleaseShared(int releases) {
  17. // Decrement count; signal when transition to zero
  18. for (;;) {
  19. int c = getState();
  20. if (c == 0)
  21. return false;
  22. int nextc = c-1;
  23. if (compareAndSetState(c, nextc))
  24. return nextc == 0;
  25. }
  26. }
  27. }

CountDownLatch提供了await方法来实现:

await():使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。

await(long timeout, TimeUnit unit): 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。

 
  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
await 内部调用 sync acquireSharedInterruptibly 方法:

 
  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireSharedInterruptibly(arg);
  7. }

acquireSharedInterruptibly()的作用是获取共享锁。如果在获取共享锁过程中线程中断则抛出InterruptedException异常。否则通过tryAcquireShared方法来尝试获取共享锁。如果成功直接返回,否则调用doAcquireSharedInterruptibly方法。

tryAcquireShared源码:

 
  1. protected int tryAcquireShared(int acquires) {
  2. return (getState() == 0) ? 1 : -1;
  3. }
tryAcquireShared 方法被 CountDownLatch 重写,他的主要作用是尝试着获取锁。 getState == 0 表示锁处于可获取状态返回 1 否则返回 -1 ;当 tryAcquireShared 返回-1获取锁失败,调用 doAcquireSharedInterruptibly 获取锁:

 
  1. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  2. //创建当前线程(共享锁)Node节点
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean failed = true;
  5. try {
  6. for (;;) {
  7. //获取当前节点的前继节点
  8. final Node p = node.predecessor();
  9. //如果当前节点为CLH列头,则尝试获取锁
  10. if (p == head) {
  11. //获取锁
  12. int r = tryAcquireShared(arg);
  13. if (r >= 0) {
  14. setHeadAndPropagate(node, r);
  15. p.next = null; // help GC
  16. failed = false;
  17. return;
  18. }
  19. }
  20. //如果当前节点不是CLH列头,当前线程一直等待,直到获取锁为止
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. throw new InterruptedException();
  24. }
  25. } finally {
  26. if (failed)
  27. cancelAcquire(node);
  28. }
  29. }
CountDownLatch ,除了提供 await 方法外,还提供了 countDown() countDown 所描述的是“递减锁存器的计数,如果计数到达零,则释放所有等待的线程。”,源码如下:

 
  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
countDown 内部调用 releaseShared 方法来释放线程:

 
  1. public final boolean releaseShared(int arg) {
  2. //尝试释放线程,如果释放释放则调用doReleaseShared()
  3. if (tryReleaseShared(arg)) {
  4. doReleaseShared();
  5. return true;
  6. }
  7. return false;
  8. }
tryReleaseShared ,同时被 CountDownLatch 重写了:

 
  1. protected boolean tryReleaseShared(int releases) {
  2. // Decrement count; signal when transition to zero
  3. for (;;) {
  4. //获取锁状态
  5. int c = getState();
  6. //c == 0 直接返回,释放锁成功
  7. if (c == 0)
  8. return false;
  9. //计算新“锁计数器”
  10. int nextc = c-1;
  11. //更新锁状态(计数器)
  12. if (compareAndSetState(c, nextc))
  13. return nextc == 0;
  14. }
  15. }

Java并发编程之CountDownLatch


有10个运动员赛跑,开跑之前,裁判需要等待10个运动员都准备好才能发令,并且10个运动员准备好之后也都需要等待裁判发令

才能开跑。

 
  1. public class CountDownLatchTest {
  2.  
  3. public static void main(String[] args) throws InterruptedException {
  4. // 用于判断发令之前运动员是否已经完全进入准备状态,需要等待5个运动员,所以参数为10
  5. final CountDownLatch runner = new CountDownLatch(10);
  6. // 用于判断裁判是否已经发令,只需要等待一个裁判,所以参数为1
  7. final CountDownLatch referee = new CountDownLatch(1);
  8.  
  9. ExecutorService executor = Executors.newCachedThreadPool();
  10.  
  11. for (int i = 1; i <= 10; i++) {
  12. final int offset = i;
  13. executor.execute(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
  18. System.out.println("运动员【" + offset+ "】准备完毕" + Thread.currentThread().getName());
  19. runner.countDown();
  20. referee.await();
  21. System.out.println("运动员【" + offset+ "】开跑。。。" + Thread.currentThread().getName());
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26.  
  27. });
  28. }
  29.  
  30. runner.await();
  31. referee.countDown();
  32. System.out.println("裁判:所有运动员准备完毕,开始...");
  33.  
  34. executor.shutdown();
  35. }
  36.  
  37. }
 
  1. 运动员【3】准备完毕pool-1-thread-3
  2. 运动员【6】准备完毕pool-1-thread-6
  3. 运动员【1】准备完毕pool-1-thread-1
  4. 运动员【4】准备完毕pool-1-thread-4
  5. 运动员【2】准备完毕pool-1-thread-2
  6. 运动员【5】准备完毕pool-1-thread-5
  7. 运动员【7】准备完毕pool-1-thread-7
  8. 运动员【9】准备完毕pool-1-thread-9
  9. 运动员【10】准备完毕pool-1-thread-10
  10. 运动员【8】准备完毕pool-1-thread-8
  11. 裁判:所有运动员准备完毕,开始...
  12. 运动员【3】开跑。。。pool-1-thread-3
  13. 运动员【6】开跑。。。pool-1-thread-6
  14. 运动员【1】开跑。。。pool-1-thread-1
  15. 运动员【4】开跑。。。pool-1-thread-4
  16. 运动员【2】开跑。。。pool-1-thread-2
  17. 运动员【5】开跑。。。pool-1-thread-5
  18. 运动员【7】开跑。。。pool-1-thread-7
  19. 运动员【9】开跑。。。pool-1-thread-9
  20. 运动员【10】开跑。。。pool-1-thread-10
  21. 运动员【8】开跑。。。pool-1-thread-8

可以看到,一切都是如此地完美,运动员准备好了之后裁判才发令,裁判发令之后运动员才开跑。


Java并发编程之CountDownLatch

CountDownLatch内部通过“共享锁”实现。在创建CountDownLatch时,需要传递一个int类型的count参数,该count参数为“锁状态”的初始值,该值表示着该“共享锁”可以同时被多少线程获取。当某个线程调用await方法时,首先判断锁的状态是否处于可获取状态(其条件就是count==0?),如果共享锁可获取则获取共享锁,否则一直处于等待直到获取为止。当线程调用countDown方法时,计数器count – 1。当在创建CountDownLatch时初始化的count参数,必须要有count线程调用countDown方法才会使计数器count等于0,锁才会释放,前面等待的线程才会继续运行。


Java并发编程之CountDownLatch

看了各种资料和书,大家一致的意见都是CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从jdk作者设计的目的来看,javadoc是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

CyclicBarrier: A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

从javadoc的描述可以得出:

  • CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;

  • CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。