先做总结:
1、CountDownLatch 是什么?
CountDownLatch 允许一个或多个线程等待其他线程(不一定是线程,某个操作)完成之后再执行。
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。
由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。
2、实现原理:
(1)CountDownLatch 的sync属性,继承自AQS
(2)CountDownLatch countDownLatch = new CountDownLatch(5);时将countDownLatch.sync.state设置为5
(3)countDownLatch.await(),检查sync.state!=0时,当前线程park(),放入sync的阻塞队列
(4)countDownLatch.countDown(),sync.state - 1,如果发现sync.state==0了,唤醒sync阻塞队列中的线程
3、CountDownLatch 与 CyclicBarrier区别:
(1)CountDownLatch的计数器只能使用一次,而CyclicBarrier可以重复使用(可以重置)。
(2)CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。
(3)CyclicBarrier针对的是线程,而CountDownLatch针对的是操作(只要调用countDownLatch.countDown()可以)。
一、应用举例
// 老板进入会议室等待5个人全部到达会议室才会开会。所以这里有两个线程老板等待开会线程、员工到达会议室:
class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5); // Boss线程,等待员工到达开会
static class BossThread extends Thread {
@Override
public void run() {
System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
try {
countDownLatch.await(); // Boss等待
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println("所有人都已经到齐了,开会吧...");
}
} // 员工到达会议室
static class EmpleoyeeThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ",到达会议室...."); // 员工到达会议室 count - 1
countDownLatch.countDown();
}
} public static void main(String[] args) {
// Boss线程启动
new BossThread().start(); // 员工到达会议室
for (int i = 0; i < countDownLatch.getCount(); i++) {
new EmpleoyeeThread().start();
}
}
}
二、类结构
public class CountDownLatch {
private final Sync sync; // 锁
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {
setState(count);
} int getCount() {
return getState();
} protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
} protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}
三、原理解析
CountDownLatch countDownLatch = new CountDownLatch(5);
public CountDownLatch(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
} /**
* CountDownLatch.Sync.Sync(int)
* AQS的state用作count计数
*/
Sync(int count) {
setState(count);
}
countDownLatch.await();
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
} /**
* AbstractQueuedSynchronizer.acquireSharedInterruptibly(int)
* 尝试获取锁,获取不到锁,当前进入同步队列并挂起
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取锁
doAcquireSharedInterruptibly(arg); // 获取不到锁,当前进入同步队列并挂起
} /**
* CountDownLatch.Sync.tryAcquireShared(int)
* state/count没有减到0之前不予许拿锁
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
countDownLatch.countDown();
public void countDown() {
sync.releaseShared(1);
} /**
* AbstractQueuedSynchronizer.releaseShared(int)
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放锁
doReleaseShared(); // 释放掉锁之后,唤醒同步队列的线程(调用await()的线程)
return true;
}
return false;
} /**
* CountDownLatch.Sync.tryReleaseShared(int)
* countDown一次count/state减一
* 直到count/state减到0,return true,允许释放同步队列里的线程
*/
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
四、CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。