简介
闭锁是一种同步工具类,可以延迟线程的进度,知道其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能够通过,当达到结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以确保某些活动知道其他活动都完成之后才继续执行。
CountDownLatch是一种灵活的闭锁实现,它允许一个或多个线程等待一组事件的产生。闭锁状态包括一个计数器,该计数器初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法会一直阻塞知道计数器为0,或者等待中的线程中断,或者等待超时。
CountDownLatch源码详解
CountDownLatch类图如下:
从类图中可以看出,CountDownLatch内部依赖Sync实现,而Sync继承自AQS。CountDownLatch仅提供了一个构造方法:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
Sync是CountDownLatch的静态内部类,其定义也比较简单,如下所示:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 设置同步状态值 setState(count); } int getCount() { // 获取同步状态值 return getState(); } // 共享式获取同步状态 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 共享式释放同步状态 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
await()方法
CountDownLatch提供了await()方法来使当前线程一直等待,直到计数器的值减为0,或者线程被中断,该方法定义如下:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
await()方法调用了AQS的共享式相应中断获取同步状态的方法,acquireSharedInterruptibly(int),如下所示:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
Sync类重写了tryAcquireShared(int)方法:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
可以看到,只有当计数器(即同步状态)值为0时,才返回1,即当前线程获取到了同步状态,在这里表示等待线程可以继续执行,若计数器值不是0,则当前线程会调用doAcquireSharedInterruptibly(int)方法,一直自旋去尝试获取同步状态:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
countDown()方法
CountDownLatch提供了countDown()方法递减计数器的值,如果计数到达0,则释放所有等待的线程,该方法定义如下:
public void countDown() { sync.releaseShared(1); }
countDown()方法调用了AQS的releaseShared(int)方法来释放共享锁同步状态:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
Sync类重写了releaseShared(int)方法:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取同步状态 int c = getState(); // 同步状态为0,则直接返回 if (c == 0) return false; // 计算并更新同步状态 int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
应用程序示例
我们看一个CountDownLatch的应用示例:
public class CountDownLatchTest { // 自定义工作线程 private static class Worker extends Thread { private CountDownLatch countDownLatch; public Worker(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { super.run(); try { countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "开始执行"); // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "执行完毕"); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 3; i++) { System.out.println("创建工作线程" + i); Worker worker = new Worker(countDownLatch); worker.start(); } // 工作线程需要等待主线程准备操作完毕才可以执行,这里用Thread.sleep()来模拟准备操作 Thread.sleep(1000); System.out.println("主线程准备完毕"); countDownLatch.countDown(); } }
运行结果(不唯一):
创建工作线程0 创建工作线程1 创建工作线程2 主线程准备完毕 Thread-0开始执行 Thread-2开始执行 Thread-1开始执行 Thread-0执行完毕 Thread-2执行完毕 Thread-1执行完毕
在上述代码中,我们自定义的工作线程必须要等主线程准备完毕才可以执行,我们可以使用CountDownLatch类来帮助我们完成。从程序的执行结果中也可以看出,3个工作线程确实是在主线程准备完毕后才开始执行。
相关博客
AbstractQueuedSynchronizer同步队列详解
AbstractQueuedSynchronizer共享式同步状态获取与释放
参考资料
方腾飞:《Java并发编程的艺术》
Doug Lea:《Java并发编程实战》