背景(来自注释):
Phaser作为一个可复用的同步屏障来使用,在功能上相似于CyclicBarrier和CountDownLatch,但是支持更灵活的使用场景。
Registration.不像其他的屏障,在一个Phaser上注册的同步对象个数是可变的。任务能够在任意时刻被注册(使用类似于register方法/bulkRegister方法,或者在构造的一开始注册),以及能够在到达之后可选择地撤销(使用arriveAndDeregister方法)。像大多数基本的同步工具一样,注册和撤销只会影响内部的计数,他们不会在内部保留对注册对象的跟踪,所以任务不能通过查询得知自身是否被注册。(然而,你能够引入类似的跟踪策略通过扩展这个类)。
Synchronization.类似于CyclicBarrier,一个Phaser能够重复用于等待。方法arriveAndAwaitAdvance有类似于CyclicBarrier.await的效果。phaser的每一代都伴随有一个阶段数。这个阶段数开始为0,当所有注册对象到达之后会递增,重新调整为0当达到Integer.MAX_VALUE时。阶段(phase)的使用可以让独立的几个行动抵达在phaser上等待其他的行动,通过两种类型的方法能够让已注册对象达到这一点:
- Arrival.arrive和arriveAndDeregister记录到达。这些方法不会阻塞,但是返回一个伴随的阶段数;也就是这一次到达所使用的阶段数。当最后一个注册对象抵达之后,一个可选的行动将会被执行以及阶段数会递增。被注册对象执行的方法会触发阶段数的递增,已经通过覆盖方法onAdvance,能够控制Phaser的结束。覆盖这个方法类似于通过提供一个屏障行动给CyclicBarrier,但是更灵活。
- Waiting.方法awaitAdvance需要一个参数指示一个抵达阶段数,以及返回当阶段数抵达一个不一样的阶段数。不同于CyclicBarrier的是,awaitAdvance会在中断之后继续等待。中断和超时的版本同时提供,但是中断和超时不会改变phaser的状态。如果需要,你可以执行这些异常的伴随处理器,通过调用forceTermination。Phaser也可以被在ForkJoinPool中执行的任务使用,这样可以确保充足的并行性,当其他任务在一个阶段上阻塞时。
Tiering. Phasers能够被分层(通过构造树结构)从而降低竞争。一个拥有大量参与者的Phaser能够通过构造拥有同一个公共父Phaser的结构来降低严重的同步竞争开销。这样能够极大地增加吞吐量,即使它带来每个操作更大的开销。
在一个分层phasers的树结构中,注册和撤销孩子phasers以及它们的父phaser是被自动管理的。当一个子phaser的注册数变为非0时(通过Phaser(Phaser,int)、constructor、register、bulkRegister),子phaser被父phaser注册。当注册数变为0,子phaser会从父phaser中撤销。
Monitoring.同步方法只能够被已注册的对象调用,phaser的当前状态能够被每一个调用者监视。在任何给定的时刻拥有{getRegisteredParties}个参与者,以及{getArriveParties}个对象已经抵达当前的阶段{getPhase}。当剩余的{getUnarrivedParties}个对象到达时,这个阶段数递增。但是这些方法返回的只是瞬时数据,所以在同步控制中不是非常有用。方法{toString}返回当前监视的这些状态的一个快照。
使用场景:
Phaser可以代替CountDownLatch来构建一个服务于多个对象只执行一次的行动。
这个典型的应用惯例是注册-->执行任务-->撤销,例如:
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
可以通过覆盖onAdvance来实现让多个线程把某些任务执行固定的次数。
void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}
如果主任务需要等待phaser的结束,那么它可以不断地注册自己从而执行一个类似循环:
phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance()
void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // assumes caller not already registered
while (p < phase) {
if (phaser.isTerminated())
// ... deal with unexpected termination
else
p = phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}
创建一个集合的任务可以使用一个树结构的phasers,你能够使用以下的代码,假如一个Task类有一个构造函数接受一个Phaser从而进行注册。在调用build(new Task[n], 0, n, new Phaser())后,这些任务能够被执行,
比如提交给一个线程池:
void build(Task[] tasks, int lo, int hi, Phaser ph) {最好的{TASKS_PER_PHASER}依赖于期待的同步速率。很小的数字比如4适合于执行非常小的任务(所以速率高),或者成百上千适合于使用大任务。
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
// assumes new Task(ph) performs ph.register()
}
}
注意:我们的这个实现将参与者的上限定为65535.尝试注册更多的参与者会导致IllegalStateException异常。当然,你可以通过使用分层Phaser的方法去满足任意数量的参与者。
算法:
当前类(Phaser)实现了一个X10"clocks“的扩展。
主要的64位状态变量(state),被划分为4个域:
- unarrived--没有抵达屏障的参与者个数(bits 0-15)
- parties --需要等待的参与者个数 (bits 16-37)
- phase --屏障的阶段数 (bits 32-62)
- terminated --1为结束 (bits 63/sign)
实现:
public Phaser(Phaser parent, int parties) {这个构造函数的作用为,根据父Phaser和参与方个数创建一个Phaser。当给出的父phaser不为null以及给出的参与方个数大于0时,这个子phaser会注册到父phaser中。
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
详情如下:
- 首先判断将要注册的个数(parties>>>PARTIES_SHIFT!=0),如果成立那么说明参与方太多,直接抛出异常。
- 设置当前phaser的父phaser以及阶段数phase为0,假如父phaser不为null,那么取得父phaser的根root,设置当前root以及两个队列evenQ和oddQ,当注册数不为0时,在父phaser中注册当前phaser。
- 否则只需要设置自身为root以及初始化evenQ和oddQ。
- 设置状态变量state:0-15位作为unarrived个数(若parties为0则用1),16-31位作为parties的值,32-62位作为phase(阶段数)的值,最高位63作为phaser是否结束的标志(1为结束)。
public int register() {
return doRegister(1);
}
private int doRegister(int registrations) { // adjustment to state long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { long s = (parent == null) ? state : reconcileState(); int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) break; if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) break; } } else if (parent == null) { // 1st root registration long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } else { synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock phase = parent.doRegister(1); if (phase < 0) break; // finish registration whenever parent registration // succeeded, even when racing with termination, // since these are part of the same "transaction". while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase; }
- 调用doRegister(1),1为注册个数。
- doRegister中,首先获得需要递增的值adjust,获得父phaser,然后进入循环。
- 首先调整state(状态值)(reconcileState),因为阶段数要以root中的为准,所以reconcileState方法会取得root中阶段数组合当前的paties和unarrived值成为新的state状态。
- 之后假如phase小于0,既phaser已经结束,返回负数。
- 假如需要注册个数过大,爬出异常。
- 假如当前counts不为空(parties或者unarrived不为空),那么当parent不为空或者状态数state稳定(阶段数不变)时,假如发现unarrived为0则等待(被状态数改变之后唤醒然后重试),否则尝试调整当前的state状态值。
- 假如counts为空并且parent为空,则尝试直接改变当前的状态值state(即为调整parties和unarrived值)。
- 假如counts为空并且parent不为空,那么说明不仅需要改变当前的state,还需要去父phaser中注册,这里使用内置同步块的方式,当phase小于0时返回,在成功调整state之后跳出返回阶段值。
public int arriveAndAwaitAdvance() {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
}
}
这个方法在参与者注册之后被调用的,作为是声明自己抵达改phaser并等待其他参与者抵达。等价于awaitAdvance(arrive()),如果你想要使用可中断或者超时的方式来等待,那么你可以使用相似的方法通过另外的形式来调用awaitAdvance来达到目的。如果你还需要再抵达之后撤销,那么可以使用awaitAdvance(arriveAndDeregister())。这个方法不能被没有注册的参与方调用,否则可能抛出IllegalStateException异常。
详情如下:
- 首先取得跟phaser,进入循环。
- 可能调整并取得最新的状态数state(可能会更新phase)。
- 取得阶段数(phase),小于0则返回(结束)。
- 取得counts(该值为当前parties和unarrived的或操作,若为EMPTY说明当前phaser上无任何参与方)。
- 取得unarrived值,小于或等于0抛出IllegalStateException异常(说明有未注册的一方调用了此方法)。
- 试着CAS改变state的值(仅unarrived的部分),失败重试。
- 成功之后,根据unarrived判断,假如>1说明不是最后一个抵达的参与方,所以调用internalAwaitAdvance方法在root上等待。
- 否则,说明当前是改phaser的最后一个抵达的参与方,那么假如root!=this(当前phaser不是根),就调用parent(父phaser)的arriveAndAwaitAdvance方法(同名方法)。
- 否则当前phaser就为根,那么保留注册数n(s & PARTIES_MASK ),根据可扩展的onAdvance方法、以及当前注册数以及递增之后的阶段数(phase)来CAS构造新的状态值(state)。这一步若失败说明有外部的终止调用,直接返回后32位(最高位1)即可。
- 成功之后返回新的状态值。
/**
* Main implementation for methods arrive and arriveAndDeregister.
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
* @param adjust value to subtract from state;
* ONE_ARRIVAL for arrive,
* ONE_DEREGISTER for arriveAndDeregister
*/
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
doArrive主要被arrive和arriveAndDeregister来调用,与arriveAndAwaitAdvance的代码非常类似(所以不给出详情),主要的区别在于:
- doArrive中可能会有撤销操作,而arriveAndAwaitAdvacne中只是抵达操作,所以当nextUnarrived(PARTIES)的值为0时,需要对父phaser也发起传递的撤销操作。
- arriveAndAwaitAdvance中只需要调用父phaser的同名方法,但是doArrive中会根据nextUnarrived的值来区分调用父phaser的同名方法时的参数
存在的问题: