CyclicBarrier
所描述的是“允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务”。而CountDownlatch
和它也有一点点相似之处:CountDownlatch
所描述的是“在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待”。在JDK API中是这样阐述的:
用给定的计数 初始化 CountDownLatch
。由于调用了 countDown()
方法,所以在当前计数到达零之前,await
方法会一直受阻塞。之后,会释放所有等待的线程,await
的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier
。
CountDownLatch
是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch
用作一个简单的开/关锁存器,或入口:在通过调用 countDown()
的线程打开入口前,所有调用 await
的线程都一直在入口处等待。用 N 初始化的 CountDownLatch
可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
CountDownLatch
的一个有用特性是,它不要求调用 countDown
方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await
。
虽然,CountDownlatch
与CyclicBarrier
有那么点相似,但是他们还是存在一些区别的:
1、CountDownLatch
的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier
则是允许N个线程相互等待。
2、 CountDownLatch
的计数器无法被重置;CyclicBarrier
的计数器可以被重置后使用,因此它被称为是循环的barrier
。
CountDownLatch
是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
CountDownLatch
结构如下:
从上图中可以看出CountDownLatch
依赖Sync
,其实CountDownLatch
内部采用的是共享锁来实现的(内部Sync的实现可以看出)。它的构造函数如下:
CountDownLatch(int count)
:构造一个用给定计数初始化的 CountDownLatch
。
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
CountDownLatch
内部是采用共享锁来实现的:
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- Sync(int count) {
- setState(count);
- }
- int getCount() {
- return getState();
- }
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- }
CountDownLatch
提供了await方法来实现:
await()
:使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
await(long timeout, TimeUnit unit)
: 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
await
内部调用
sync
的
acquireSharedInterruptibly
方法:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
acquireSharedInterruptibly()
的作用是获取共享锁。如果在获取共享锁过程中线程中断则抛出InterruptedException
异常。否则通过tryAcquireShared
方法来尝试获取共享锁。如果成功直接返回,否则调用doAcquireSharedInterruptibly
方法。
tryAcquireShared
源码:
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
tryAcquireShared
方法被
CountDownLatch
重写,他的主要作用是尝试着获取锁。
getState == 0
表示锁处于可获取状态返回
1
否则返回
-1
;当
tryAcquireShared
返回-1获取锁失败,调用
doAcquireSharedInterruptibly
获取锁:
- private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
- //创建当前线程(共享锁)Node节点
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- //获取当前节点的前继节点
- final Node p = node.predecessor();
- //如果当前节点为CLH列头,则尝试获取锁
- if (p == head) {
- //获取锁
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- //如果当前节点不是CLH列头,当前线程一直等待,直到获取锁为止
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
CountDownLatch
,除了提供
await
方法外,还提供了
countDown()
,
countDown
所描述的是“递减锁存器的计数,如果计数到达零,则释放所有等待的线程。”,源码如下:
- public void countDown() {
- sync.releaseShared(1);
- }
countDown
内部调用
releaseShared
方法来释放线程:
- public final boolean releaseShared(int arg) {
- //尝试释放线程,如果释放释放则调用doReleaseShared()
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
tryReleaseShared
,同时被
CountDownLatch
重写了:
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- //获取锁状态
- int c = getState();
- //c == 0 直接返回,释放锁成功
- if (c == 0)
- return false;
- //计算新“锁计数器”
- int nextc = c-1;
- //更新锁状态(计数器)
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
有10个运动员赛跑,开跑之前,裁判需要等待10个运动员都准备好才能发令,并且10个运动员准备好之后也都需要等待裁判发令
才能开跑。
- public class CountDownLatchTest {
- public static void main(String[] args) throws InterruptedException {
- // 用于判断发令之前运动员是否已经完全进入准备状态,需要等待5个运动员,所以参数为10
- final CountDownLatch runner = new CountDownLatch(10);
- // 用于判断裁判是否已经发令,只需要等待一个裁判,所以参数为1
- final CountDownLatch referee = new CountDownLatch(1);
- ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 1; i <= 10; i++) {
- final int offset = i;
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
- System.out.println("运动员【" + offset+ "】准备完毕" + Thread.currentThread().getName());
- runner.countDown();
- referee.await();
- System.out.println("运动员【" + offset+ "】开跑。。。" + Thread.currentThread().getName());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- runner.await();
- referee.countDown();
- System.out.println("裁判:所有运动员准备完毕,开始...");
- executor.shutdown();
- }
- }
- 运动员【3】准备完毕pool-1-thread-3
- 运动员【6】准备完毕pool-1-thread-6
- 运动员【1】准备完毕pool-1-thread-1
- 运动员【4】准备完毕pool-1-thread-4
- 运动员【2】准备完毕pool-1-thread-2
- 运动员【5】准备完毕pool-1-thread-5
- 运动员【7】准备完毕pool-1-thread-7
- 运动员【9】准备完毕pool-1-thread-9
- 运动员【10】准备完毕pool-1-thread-10
- 运动员【8】准备完毕pool-1-thread-8
- 裁判:所有运动员准备完毕,开始...
- 运动员【3】开跑。。。pool-1-thread-3
- 运动员【6】开跑。。。pool-1-thread-6
- 运动员【1】开跑。。。pool-1-thread-1
- 运动员【4】开跑。。。pool-1-thread-4
- 运动员【2】开跑。。。pool-1-thread-2
- 运动员【5】开跑。。。pool-1-thread-5
- 运动员【7】开跑。。。pool-1-thread-7
- 运动员【9】开跑。。。pool-1-thread-9
- 运动员【10】开跑。。。pool-1-thread-10
- 运动员【8】开跑。。。pool-1-thread-8
可以看到,一切都是如此地完美,运动员准备好了之后裁判才发令,裁判发令之后运动员才开跑。
CountDownLatch
内部通过“共享锁”实现。在创建CountDownLatch
时,需要传递一个int
类型的count
参数,该count参数为“锁状态”的初始值,该值表示着该“共享锁”可以同时被多少线程获取。当某个线程调用await
方法时,首先判断锁的状态是否处于可获取状态(其条件就是count==0?
),如果共享锁可获取则获取共享锁,否则一直处于等待直到获取为止。当线程调用countDown
方法时,计数器count – 1
。当在创建CountDownLatch
时初始化的count
参数,必须要有count
线程调用countDown
方法才会使计数器count
等于0
,锁才会释放,前面等待的线程才会继续运行。
看了各种资料和书,大家一致的意见都是CountDownLatch
是计数器,只能使用一次,而CyclicBarrier
的计数器提供reset功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从jdk作者设计的目的来看,javadoc是这么描述它们的:
CountDownLatch
: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
CyclicBarrier
: A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
从javadoc的描述可以得出:
CountDownLatch
:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;-
CyclicBarrier
:多个线程互相等待,直到到达同一个同步点,再继续一起执行。
对于CountDownLatch
来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier
,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。