聊聊 CountDownLatch 闭锁源码分析

时间:2022-10-07 14:27:05

聊聊 CountDownLatch 闭锁源码分析

功能简介

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态【CPJ 3.4.2】。闭锁的作用相当于一扇门∶ 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如∶

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而所有需要 R 的操作都必须先在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S 时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖 S 的服务才能继续执行。
  • 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

聊聊 CountDownLatch 闭锁源码分析

CountDownLatch.jpg

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而 await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

使用案例

TestHarness 中给出了闭锁的两种常见用法。TestHarness 创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示"起始门(Starting Gate)"和"结束门(Ending Gate)"。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的 countDown 方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。

  1. public class TestHarness { 
  2.  
  3.     public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { 
  4.         final CountDownLatch startGate = new CountDownLatch(1); 
  5.         final CountDownLatch endGate = new CountDownLatch(nThreads); 
  6.  
  7.         for (int i = 0; i < nThreads; i++) { 
  8.             Thread t = new Thread(() -> { 
  9.                 try { 
  10.                     startGate.await(); 
  11.                     try { 
  12.                         task.run(); 
  13.                     } finally { 
  14.                         endGate.countDown(); 
  15.                     } 
  16.  
  17.                 } catch (InterruptedException ignored) { 
  18.  
  19.                 } 
  20.             }); 
  21.             t.start(); 
  22.         } 
  23.  
  24.         long start = System.nanoTime(); 
  25.         startGate.countDown(); 
  26.         endGate.await(); 
  27.         long end = System.nanoTime(); 
  28.         return end - start; 
  29.     } 
  30.  
  31.     public static void main(String[] args) throws InterruptedException { 
  32.         TestHarness testHarness = new TestHarness(); 
  33.         AtomicInteger num = new AtomicInteger(0); 
  34.         long time = testHarness.timeTasks(10, () -> System.out.println(num.incrementAndGet())); 
  35.         System.out.println("cost time: " + time + "ms"); 
  36.     } 
  37.  
  38. //输出结果 
  39. 10 
  40. cost time: 2960900ms 

为什么要在 TestHarness 中使用闭锁,而不是在线程创建后就立即启动? 或许,我们希望测试 n 个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将"领先"后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够实时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。

使用总结

CountDownLatch 是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch 使用完毕后,它不能再次被使用。

源码分析

代码分析

CountDownLatch 在底层还是采用 AbstractQueuedSynchronizer 实现。

  1. CountDownLatch startGate = **new **CountDownLatch(1); 

我们先看它的构造方法, 创建了一个 sync 对象。

  1. public CountDownLatch(int count) { 
  2.     if (count < 0) throw new IllegalArgumentException("count < 0"); 
  3.     this.sync = new Sync(count); 

Sync 是 AbstractQueuedSynchronizer 的一个实现, 按照字面意思我们可以猜到它是公平方式实现。

  1. private static final class Sync extends AbstractQueuedSynchronizer { 
  2.     private static final long serialVersionUID = 4982264981922014374L; 
  3.  
  4.     // 构造方法 
  5.     Sync(int count) { 
  6.         setState(count); 
  7.     } 
  8.  
  9.     // 获取资源数 
  10.     int getCount() { 
  11.         return getState(); 
  12.     } 
  13.  
  14.     // 获取锁 
  15.     protected int tryAcquireShared(int acquires) { 
  16.         return (getState() == 0) ? 1 : -1; 
  17.     } 
  18.  
  19.     // 释放锁 
  20.     protected boolean tryReleaseShared(int releases) { 
  21.         // Decrement count; signal when transition to zero 
  22.         for (;;) { 
  23.             int c = getState(); 
  24.             if (c == 0) 
  25.                 return false
  26.             int nextc = c-1; 
  27.             // CAS 解锁 
  28.             if (compareAndSetState(c, nextc)) 
  29.                 return nextc == 0; 
  30.         } 
  31.     } 

在 await 方法中如果存在计算值, 那么当前线程将进入 AQS 队列生成 Node 节点, 线程进入阻塞状态。

  1. public void await() throws InterruptedException { 
  2.     sync.acquireSharedInterruptibly(1); 

其实主要是获取共享锁。

  1. public final void acquireSharedInterruptibly(int arg) 
  2.     throws InterruptedException { 
  3.     if (Thread.interrupted()) 
  4.         throw new InterruptedException(); 
  5.     if (tryAcquireShared(arg) < 0) 
  6.         doAcquireSharedInterruptibly(arg); 

CountDownLatch.Sync 实现了 tryAcquireShared 方法 ,如果 getState() == 0 返回 1 , 否则返回 -1. 也就是说创建 CountDownLatch 实例后再执行 await 方法将继续调用 doAcquireSharedInterruptibly(arg);

  1. // 是否可获取共享锁 
  2. protected int tryAcquireShared(int acquires) { 
  3.     return (getState() == 0) ? 1 : -1; 
  4.  
  5.  
  6. // 尝试获取锁, 或者入队 
  7. private void doAcquireSharedInterruptibly(int arg) 
  8.     throws InterruptedException { 
  9.     final Node node = addWaiter(Node.SHARED); 
  10.     boolean failed = true
  11.     try { 
  12.         for (;;) { 
  13.             final Node p = node.predecessor(); 
  14.             if (p == head) { 
  15.                 int r = tryAcquireShared(arg); 
  16.                 if (r >= 0) { 
  17.                     setHeadAndPropagate(node, r); 
  18.                     p.next = null; // help GC 
  19.                     failed = false
  20.                     return
  21.                 } 
  22.             } 
  23.             if (shouldParkAfterFailedAcquire(p, node) && 
  24.                 parkAndCheckInterrupt()) 
  25.                 throw new InterruptedException(); 
  26.         } 
  27.     } finally { 
  28.         if (failed) 
  29.             cancelAcquire(node); 
  30.     } 

在 countDown 方法如果存在等待的线程, 将对其进行唤醒. 或者减少 CountDownLatch 资源数。

  1. public void countDown() { 
  2.     sync.releaseShared(1); 

通过 releaseShared 对共享锁进行解锁。

  1. public final boolean releaseShared(int arg) { 
  2.     if (tryReleaseShared(arg)) { 
  3.         doReleaseShared(); 
  4.         return true
  5.     } 
  6.     return false

最终会调用 doReleaseShared 唤醒 AQS 中的头节点。

  1. private void doReleaseShared() { 
  2.     /* 
  3.          * Ensure that a release propagates, even if there are other 
  4.          * in-progress acquires/releases.  This proceeds in the usual 
  5.          * way of trying to unparkSuccessor of head if it needs 
  6.          * signal. But if it does not, status is set to PROPAGATE to 
  7.          * ensure that upon release, propagation continues. 
  8.          * Additionally, we must loop in case a new node is added 
  9.          * while we are doing this. Also, unlike other uses of 
  10.          * unparkSuccessor, we need to know if CAS to reset status 
  11.          * fails, if so rechecking. 
  12.          */ 
  13.     for (;;) { 
  14.         Node h = head; 
  15.         if (h != null && h != tail) { 
  16.             int ws = h.waitStatus; 
  17.             if (ws == Node.SIGNAL) { 
  18.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
  19.                     continue;            // loop to recheck cases 
  20.                 unparkSuccessor(h); 
  21.             } 
  22.             else if (ws == 0 && 
  23.                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
  24.                 continue;                // loop on failed CAS 
  25.         } 
  26.         if (h == head)                   // loop if head changed 
  27.             break; 
  28.     } 

详细流程如下图:

源码流程图

聊聊 CountDownLatch 闭锁源码分析

CountDownLatch 闭锁源码分析.png

参考资料

《Java 并发编程实战》

https://www.cnblogs.com/Lee_xy_z/p/10470181.html

原文链接:https://mp.weixin.qq.com/s/7rn6NCPqIcGiDs3cVuVm2g