Java多线程 -- JUC包源码分析11 -- CyclicBarrier源码分析

时间:2022-01-30 07:30:49

在前面的篇章中,讲解了ReentrantLock + Condition,并讲述了2者结合的一个典型应用:ArrayBlockingQueue/LinkedBlockingQueue。

今天讲述2者结合的另一个典型应用:CyclicBarrier。

CyclicBarrier的概念

要介绍CyclicBarrier这个概念,可以从下面这个比喻开始:一个小公司的所有人周六要出去团建,大家提前商定好周六早上9点公司门口集合,然后到了目的地,在分开行动。

在这里,“公司门口集合地“就是一个CyclicBarrier:大家本来是分散的,在某个时间点集中到一块,等所有人到齐,然后到了目的地再分散行动。

具体到代码中,就是

CyclicBarrier cb = new CyclicBarrier(5);

cb.await(); //5个线程,各自分别调await,都阻塞。直到5个线程都掉了await()的那个时间点,5个线程同时解锁,然后各自继续各自的事情!

ReentrantLock + Condition

public class CyclicBarrier {

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition(); //用于线程之间互相唤醒
private final int parties; //类似于AQS那个state原子变量,也就是总共的线程个数

...
}
    private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) { //响应中断
breakBarrier();
throw new InterruptedException();
}

int index = --count; //每个线程调用1次await(),count--
if (index == 0) { //count减到0的时候,此线程,唤醒其他所有线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null) //一起唤醒之后,还可以执行一个回调。
command.run();
ranAction = true;
nextGeneration(); //唤醒其他所有线程,同时把count值复原,用于下一次的CyclicBarrier(因为CyclicBarrier可以重复使用)
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

for (;;) { //count > 0,说明人没到齐,阻塞自己
try {
if (!timed)
trip.await(); //关键点:await阻塞自己的同时,会把锁释放掉,这样别的线程就会拿到锁,执行上面的index = count--
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
if (g != generation) //从阻塞中唤醒,返回
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

整个代码唯一不太好明白的是Generation,这个东西主要是用来复原CyclicBarrier,从而可以下1次重用。不像CountDownLatch,用完1次,就不能再用了。