Java Thread&Concurrency(5): 深入理解Phaser实现原理

时间:2022-12-16 07:01:30

背景(来自注释):


Phaser作为一个可复用的同步屏障来使用,在功能上相似于CyclicBarrier和CountDownLatch,但是支持更灵活的使用场景。

Registration.不像其他的屏障,在一个Phaser上注册的同步对象个数是可变的。任务能够在任意时刻被注册(使用类似于register方法/bulkRegister方法,或者在构造的一开始注册),以及能够在到达之后可选择地撤销(使用arriveAndDeregister方法)。像大多数基本的同步工具一样,注册和撤销只会影响内部的计数,他们不会在内部保留对注册对象的跟踪,所以任务不能通过查询得知自身是否被注册。(然而,你能够引入类似的跟踪策略通过扩展这个类)。

Synchronization.类似于CyclicBarrier,一个Phaser能够重复用于等待。方法arriveAndAwaitAdvance有类似于CyclicBarrier.await的效果。phaser的每一代都伴随有一个阶段数。这个阶段数开始为0,当所有注册对象到达之后会递增,重新调整为0当达到Integer.MAX_VALUE时。阶段(phase)的使用可以让独立的几个行动抵达在phaser上等待其他的行动,通过两种类型的方法能够让已注册对象达到这一点:

  • Arrival.arrive和arriveAndDeregister记录到达。这些方法不会阻塞,但是返回一个伴随的阶段数;也就是这一次到达所使用的阶段数。当最后一个注册对象抵达之后,一个可选的行动将会被执行以及阶段数会递增。被注册对象执行的方法会触发阶段数的递增,已经通过覆盖方法onAdvance,能够控制Phaser的结束。覆盖这个方法类似于通过提供一个屏障行动给CyclicBarrier,但是更灵活。
  • Waiting.方法awaitAdvance需要一个参数指示一个抵达阶段数,以及返回当阶段数抵达一个不一样的阶段数。不同于CyclicBarrier的是,awaitAdvance会在中断之后继续等待。中断和超时的版本同时提供,但是中断和超时不会改变phaser的状态。如果需要,你可以执行这些异常的伴随处理器,通过调用forceTermination。Phaser也可以被在ForkJoinPool中执行的任务使用,这样可以确保充足的并行性,当其他任务在一个阶段上阻塞时。

Termination.一个phaser能够进入termination状态,这个状态可以通过isTerminated来检查。在结束之后,所有同步方法立刻返回不需要再等待阶段改变,返回一个负数。类似的,尝试在结束之后注册对象不会有任何影响。结束会在一个调用onAdvance返回true之后被触发。默认的实现返回true说明撤销操作已经使得注册者个数为0。 正如在下面所说,可以通过覆盖这个方法从而让阶段数抵达一个阈值时结束phaser,从而实现固定的迭代操作。方法forceTermination也可以用作突然释放等待线程以使他们结束。

Tiering.  Phasers能够被分层(通过构造树结构)从而降低竞争。一个拥有大量参与者的Phaser能够通过构造拥有同一个公共父Phaser的结构来降低严重的同步竞争开销。这样能够极大地增加吞吐量,即使它带来每个操作更大的开销。

在一个分层phasers的树结构中,注册和撤销孩子phasers以及它们的父phaser是被自动管理的。当一个子phaser的注册数变为非0时(通过Phaser(Phaser,int)、constructor、register、bulkRegister),子phaser被父phaser注册。当注册数变为0,子phaser会从父phaser中撤销。

Monitoring.同步方法只能够被已注册的对象调用,phaser的当前状态能够被每一个调用者监视。在任何给定的时刻拥有{getRegisteredParties}个参与者,以及{getArriveParties}个对象已经抵达当前的阶段{getPhase}。当剩余的{getUnarrivedParties}个对象到达时,这个阶段数递增。但是这些方法返回的只是瞬时数据,所以在同步控制中不是非常有用。方法{toString}返回当前监视的这些状态的一个快照。


使用场景:

Phaser可以代替CountDownLatch来构建一个服务于多个对象只执行一次的行动。

这个典型的应用惯例是注册-->执行任务-->撤销,例如:

  void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}

// allow threads to start and deregister self
phaser.arriveAndDeregister();
}

可以通过覆盖onAdvance来实现让多个线程把某些任务执行固定的次数。

  void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}

如果主任务需要等待phaser的结束,那么它可以不断地注册自己从而执行一个类似循环:

    phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance()


你可以等待特定的不超过Integer.MAX_VALUE的阶段数。例如:

  void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // assumes caller not already registered
while (p < phase) {
if (phaser.isTerminated())
// ... deal with unexpected termination
else
p = phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}

创建一个集合的任务可以使用一个树结构的phasers,你能够使用以下的代码,假如一个Task类有一个构造函数接受一个Phaser从而进行注册。在调用build(new Task[n], 0, n, new Phaser())后,这些任务能够被执行,

比如提交给一个线程池:

  void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
// assumes new Task(ph) performs ph.register()
}
}
最好的{TASKS_PER_PHASER}依赖于期待的同步速率。很小的数字比如4适合于执行非常小的任务(所以速率高),或者成百上千适合于使用大任务。

注意:我们的这个实现将参与者的上限定为65535.尝试注册更多的参与者会导致IllegalStateException异常。当然,你可以通过使用分层Phaser的方法去满足任意数量的参与者。


算法:

当前类(Phaser)实现了一个X10"clocks“的扩展。

主要的64位状态变量(state),被划分为4个域:

  • unarrived--没有抵达屏障的参与者个数(bits 0-15)
  • parties   --需要等待的参与者个数      (bits 16-37)
  • phase    --屏障的阶段数                   (bits 32-62)
  • terminated --1为结束                        (bits 63/sign)
为了能够高效地维护状态的原子性,这些值被打包放进单个64位原子变量。通过简单的方式编码/解码状态变量以及竞争过程短暂,来获得高性能。
所有状态都是通过CAS操作,除了一开始的对于子phaser的注册(拥有一个非null的父phaser),在这种少见的场景中,我们在第一次注册时使用内置同步方式来加锁。
子phaser的阶段数允许滞后于它的祖先知道它能够被使用--查看方法reconcileState。


实现:

    public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
这个构造函数的作用为,根据父Phaser和参与方个数创建一个Phaser。当给出的父phaser不为null以及给出的参与方个数大于0时,这个子phaser会注册到父phaser中。

详情如下:

  • 首先判断将要注册的个数(parties>>>PARTIES_SHIFT!=0),如果成立那么说明参与方太多,直接抛出异常。
  • 设置当前phaser的父phaser以及阶段数phase为0,假如父phaser不为null,那么取得父phaser的根root,设置当前root以及两个队列evenQ和oddQ,当注册数不为0时,在父phaser中注册当前phaser。
  • 否则只需要设置自身为root以及初始化evenQ和oddQ。
  • 设置状态变量state:0-15位作为unarrived个数(若parties为0则用1),16-31位作为parties的值,32-62位作为phase(阶段数)的值,最高位63作为phaser是否结束的标志(1为结束)。
我们接着再来看register(注册)操作:

    public int register() {
return doRegister(1);
}
    private int doRegister(int registrations) {        // adjustment to state        long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;        final Phaser parent = this.parent;        int phase;        for (;;) {            long s = (parent == null) ? state : reconcileState();            int counts = (int)s;            int parties = counts >>> PARTIES_SHIFT;            int unarrived = counts & UNARRIVED_MASK;            if (registrations > MAX_PARTIES - parties)                throw new IllegalStateException(badRegister(s));            phase = (int)(s >>> PHASE_SHIFT);            if (phase < 0)                break;            if (counts != EMPTY) {                  // not 1st registration                if (parent == null || reconcileState() == s) {                    if (unarrived == 0)             // wait out advance                        root.internalAwaitAdvance(phase, null);                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,                                                       s, s + adjust))                        break;                }            }            else if (parent == null) {              // 1st root registration                long next = ((long)phase << PHASE_SHIFT) | adjust;                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))                    break;            }            else {                synchronized (this) {               // 1st sub registration                    if (state == s) {               // recheck under lock                        phase = parent.doRegister(1);                        if (phase < 0)                            break;                        // finish registration whenever parent registration                        // succeeded, even when racing with termination,                        // since these are part of the same "transaction".                        while (!UNSAFE.compareAndSwapLong                               (this, stateOffset, s,                                ((long)phase << PHASE_SHIFT) | adjust)) {                            s = state;                            phase = (int)(root.state >>> PHASE_SHIFT);                            // assert (int)s == EMPTY;                        }                        break;                    }                }            }        }        return phase;    }

这个方法的作用是为phaser增加一个新的参与方,假如此时unarrived为0,并且onAdvance方法正在被调用,这个方法会阻塞直到phaser的那部分工作完成。如果这个phaser有一个父phaser,并且这个子phaser没有注册过对象,那么这个子phaser也会在它的父phaser上面注册。如果当前phaser已经结束,那么尝试去注册不会有任何影响并且一个负数会返回。
这个方法返回当前这个注册完成时的阶段数。如果注册过多参与者会抛出IllegalStateException异常。
详情如下:
  • 调用doRegister(1),1为注册个数。
  • doRegister中,首先获得需要递增的值adjust,获得父phaser,然后进入循环。
  • 首先调整state(状态值)(reconcileState),因为阶段数要以root中的为准,所以reconcileState方法会取得root中阶段数组合当前的paties和unarrived值成为新的state状态。
  • 之后假如phase小于0,既phaser已经结束,返回负数。
  • 假如需要注册个数过大,爬出异常。
  • 假如当前counts不为空(parties或者unarrived不为空),那么当parent不为空或者状态数state稳定(阶段数不变)时,假如发现unarrived为0则等待(被状态数改变之后唤醒然后重试),否则尝试调整当前的state状态值。
  • 假如counts为空并且parent为空,则尝试直接改变当前的状态值state(即为调整parties和unarrived值)。
  • 假如counts为空并且parent不为空,那么说明不仅需要改变当前的state,还需要去父phaser中注册,这里使用内置同步块的方式,当phase小于0时返回,在成功调整state之后跳出返回阶段值。

我们接着再来看arriveAndAwaitAdvance(抵达并等待)操作:

    public int arriveAndAwaitAdvance() {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
}
}

这个方法在参与者注册之后被调用的,作为是声明自己抵达改phaser并等待其他参与者抵达。等价于awaitAdvance(arrive()),如果你想要使用可中断或者超时的方式来等待,那么你可以使用相似的方法通过另外的形式来调用awaitAdvance来达到目的。如果你还需要再抵达之后撤销,那么可以使用awaitAdvance(arriveAndDeregister())。这个方法不能被没有注册的参与方调用,否则可能抛出IllegalStateException异常。

详情如下:

  • 首先取得跟phaser,进入循环。
  • 可能调整并取得最新的状态数state(可能会更新phase)。
  • 取得阶段数(phase),小于0则返回(结束)。
  • 取得counts(该值为当前parties和unarrived的或操作,若为EMPTY说明当前phaser上无任何参与方)。
  • 取得unarrived值,小于或等于0抛出IllegalStateException异常(说明有未注册的一方调用了此方法)。
  • 试着CAS改变state的值(仅unarrived的部分),失败重试。
  • 成功之后,根据unarrived判断,假如>1说明不是最后一个抵达的参与方,所以调用internalAwaitAdvance方法在root上等待。
  • 否则,说明当前是改phaser的最后一个抵达的参与方,那么假如root!=this(当前phaser不是根),就调用parent(父phaser)的arriveAndAwaitAdvance方法(同名方法)。
  • 否则当前phaser就为根,那么保留注册数n(s & PARTIES_MASK ),根据可扩展的onAdvance方法、以及当前注册数以及递增之后的阶段数(phase)来CAS构造新的状态值(state)。这一步若失败说明有外部的终止调用,直接返回后32位(最高位1)即可。
  • 成功之后返回新的状态值。

    /**
     * Main implementation for methods arrive and arriveAndDeregister.
     * Manually tuned to speed up and minimize race windows for the
     * common case of just decrementing unarrived field.
     *
     * @param adjust value to subtract from state;
     *               ONE_ARRIVAL for arrive,
     *               ONE_DEREGISTER for arriveAndDeregister
     */
    private int doArrive(int adjust) {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase < 0)
                return phase;
            int counts = (int)s;
            int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
            if (unarrived <= 0)
                throw new IllegalStateException(badArrive(s));
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
                if (unarrived == 1) {
                    long n = s & PARTIES_MASK;  // base of next state
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    if (root == this) {
                        if (onAdvance(phase, nextUnarrived))
                            n |= TERMINATION_BIT;
                        else if (nextUnarrived == 0)
                            n |= EMPTY;
                        else
                            n |= nextUnarrived;
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        n |= (long)nextPhase << PHASE_SHIFT;
                        UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                        releaseWaiters(phase);
                    }
                    else if (nextUnarrived == 0) { // propagate deregistration
                        phase = parent.doArrive(ONE_DEREGISTER);
                        UNSAFE.compareAndSwapLong(this, stateOffset,
                                                  s, s | EMPTY);
                    }
                    else
                        phase = parent.doArrive(ONE_ARRIVAL);
                }
                return phase;
            }
        }
    }

doArrive主要被arrive和arriveAndDeregister来调用,与arriveAndAwaitAdvance的代码非常类似(所以不给出详情),主要的区别在于:

  • doArrive中可能会有撤销操作,而arriveAndAwaitAdvacne中只是抵达操作,所以当nextUnarrived(PARTIES)的值为0时,需要对父phaser也发起传递的撤销操作。
  • arriveAndAwaitAdvance中只需要调用父phaser的同名方法,但是doArrive中会根据nextUnarrived的值来区分调用父phaser的同名方法时的参数

存在的问题: