1. 简介
JUC中的CyclicBarrier提供了一种多线程间的同步机制,可以让多个线程在barrier等待其它线程到达barrier。正如其名CyclicBarrier含义就是可以循环使用的屏障。
2. 源码解读
2.1 数据结构
2.1.1 Generation
在CyclicBarrier中用Generation来代表每一轮的Cyclibarrier的运行状况。
private static class Generation {
// broken表示挂否。
boolean broken = false;
}
private Generation generation = new Generation();
在任意时刻只有一个genration实例是真正代表当前这一轮的运行状况,其他实例都是跑完或者跑挂的。
2.1.2 barrierCommand
CyclicBarrier允许我们通过构造方法设置一个Runnable对象,用来在所有线程都到达barrier时执行。
2.1.3 其它
parties表示线程数,在parties个线程都调用await方法后,barrier才算是被通过(tripped)了。
count表示还剩下未到达barrier(未调用await)的线程数量,count会在新的一轮开启或者当前这一轮跑挂时重置为parties。
CyclicBarrier中的trip用于实现线程间的等待与唤醒的通信,而lock则为CyclicBarrier中的变量(generation和count)提供可见性保证,为临界区的操作提供保护。
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private int count;
2.2 await方法
下面分析await方法的源码。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 如果已经跑挂了,抛出BrokenBarrierException。
if (g.broken)
throw new BrokenBarrierException();
// 检查中断标志位。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// 最后一个到达barrier的线程。
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 开启下一轮。
nextGeneration();
return 0;
} finally {
// 如果action执行时发生了,也会break掉barrier。
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
/*
* 对于其它(不是最后一个)线程,会在trip条件下等待被唤醒。情况有以下几类:
* 1. 所有线程都到达barrier,并成功执行了barrierAction。
* 2. 有线程执行了breakBarrier方法。
* 3. 线程本身被中断。
* 4. 超时(如果调用的带时间限制的await)。
*/
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
/*
* g == generation && !g.broken说明此时当前这一轮还没结束,并且没有其它线程执行过breakBarrier方法。
* 这种情况会执行breakBarrier置generation的broken标识为true并唤醒其它线程,之后继续抛出InterruptedException。
*/
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
/*
* 如果g != generation,此时这一轮已经结束,后面返回index作为到达barrier的次序;
* 如果g.broken说明之前已经有其它线程执行了breakBarrier方法,后面会抛出BrokenBarrierException。
*/
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// 这一轮已经结束,则返回到达屏障的次序,0表示最后一个,parties-1表示第一个。
if (g != generation)
return index;
// 判断是否超时。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 保证调用时持有锁。
private void nextGeneration() {
// 唤醒其它在trip条件下等待的线程。
trip.signalAll();
// 重置count。
count = parties;
// 开启下一轮。
generation = new Generation();
}
// 保证调用时持有锁。
private void breakBarrier() {
generation.broken = true;
// 重置count。
count = parties;
// 唤醒其它在trip条件下等待的线程。
trip.signalAll();
},
2.3 其它方法
CyclicBarrier其它还提供了例如getParties, isBroken, getNumberWaiting, reset等方法,都比较简单。
其中除了getParties由于parties被final修饰不可变,其余方法都会先去获得互斥锁。
/**
* 获取当前这一轮是否已经broken。
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/**
* 重置barrier到初始状态,所有还在等待中的线程最终会抛出BrokenBarrierException。
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/**
* 获得当前在barrier中等待的线程数。
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
3. 总结
总的来说CyclicBarrier的源码还是比较简洁易懂的,通过锁和条件,实现了在barrier上同步的功能。
常常会拿CyclicBarrier和CountdownLatch比较,CountdownLatch的的计数器到0就完事了,没法再重置恢复。而CyclicBarrier的计数器可以通过正常的一轮同步重置,也可以通过reset方法强制重置。CountdownLatch每个调用await的线程会被阻塞直到其它线程通过countDown方法将计数器减到0;而CyclicBarrier则是有parties-1个线程调用await会阻塞直到最后一个线程调用await方法。此外CyclicBarrier还可以设置一个barrierAction,相当于一个hook,这也是CountdownLatch不具有的。