ForkJoinPool 源码分析

时间:2021-05-19 22:46:27

ForkJoinPool

ForkJoinPool 是一个运行 ForkJoinTask 任务、支持工作窃取和并行计算的线程池

核心参数+创建实例

    // 工作者线程驻留任务队列索引位
static final int SWIDTH = 16;
// 低 16 位掩码
static final int SMASK = 0xffff;
// 最大并行度:#workers - 1
static final int MAX_CAP = 0x7fff;
// 最大工作队列数、提交队列数
static final int SQMASK = 0x007e; // 工作者线程需要唤醒信号
static final int UNSIGNALLED = 1 << 31; // must be negative
// 防止 ABA 问题的版本号
static final int SS_SEQ = 1 << 16;
// 提交队列锁
static final int QLOCK = 1; // 任务队列拥有所有者【驻留线程】
static final int OWNED = 1;
// 任务是先进先出的
static final int FIFO = 1 << 16;
// 线程池正在关闭
static final int SHUTDOWN = 1 << 18;
// 线程池已经停止
static final int TERMINATED = 1 << 19;
// 线程池正在停止
static final int STOP = 1 << 31; // must be negative
// 任务队列处于静止状态
static final int QUIET = 1 << 30;
// 任务队列处于静止状态 && 驻留线程已经阻塞、需要唤醒信号
static final int DORMANT = QUIET | UNSIGNALLED; /**
* 工作者线程能在驻留的工作队列中一次性处理的最大任务数
*/
static final int POLL_LIMIT = 1 << 10; /**
* 默认的工作者线程工厂
*/
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory; /**
* Common (static) pool. 通用 ForkJoinPool
*/
static final ForkJoinPool common; /**
* 通用线程池的并行度
*/
static final int COMMON_PARALLELISM; /**
* 通用线程池最大补偿的工作线程数
*/
private static final int COMMON_MAX_SPARES; /**
* 线程池序列号,用于生成工作者线程名称
*/
private static int poolNumberSequence; /**
* 默认的工作者线程超时时间,以毫秒为单位,即 60 秒
*/
private static final long DEFAULT_KEEPALIVE = 60_000L; /**
* Undershoot tolerance for idle timeouts【超时微调】
*/
private static final long TIMEOUT_SLOP = 20L; private static final int DEFAULT_COMMON_MAX_SPARES = 256; private static final int SEED_INCREMENT = 0x9e3779b9; /**
* 64 位控制变量的组成,从高到低每 16 位一个单元
* RC: 正在运行的工作者线程数 - parallelism
* TC: 正在运行的工作者线程数 + 阻塞等待的工作者线程数 - parallelism
* SS: 版本号 + 最近阻塞的线程状态
* ID: 最近阻塞的工作者线程所在的工作队列索引
*
* ac 为负数:表示运行中的工作者线程不够
* tc 为负数:表示总的工作者线程不够
* sp != 0:表示有工作者线程在阻塞等待
*/
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK; private static final int RC_SHIFT = 48;
// 单个工作者线程计数,用于 RC
private static final long RC_UNIT = 0x0001L << RC_SHIFT;
// 控制变量 48-64 位掩码
private static final long RC_MASK = 0xffffL << RC_SHIFT; private static final int TC_SHIFT = 32;
// 单个工作者线程计数,用于 TC
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
// 控制变量 32-48 位掩码
private static final long TC_MASK = 0xffffL << TC_SHIFT;
// 尝试增加一个工作者线程,工作者线程数 < parallelism
private static final long ADD_WORKER = 0x0001L << TC_SHIFT + 15; // sign // 工作者线程的窃取任务总数
volatile long stealCount;
// 工作者线程的空闲存活时间
final long keepAlive;
// 下一个工作队列索引
int indexSeed;
// 最大、最小线程数
final int bounds;
/**
* runstate:第 18、19、30、31 位
* queue mode:第 16 位
* parallelism:1-15 位,最大并行度为 1<<15-1
*/
volatile int mode;
// 已注册的工作队列
WorkQueue[] workQueues;
// 工作者线程名称前缀,同时作为创建工作队列时的 synchronized 锁
final String workerNamePrefix;
// 创建工作者线程的工厂
final ForkJoinWorkerThreadFactory factory;
// 异常处理器
final UncaughtExceptionHandler ueh;
// 线程池饱和断言
final Predicate<? super ForkJoinPool> saturate;
// 核心控制变量
@jdk.internal.vm.annotation.Contended("fjpctl")
volatile long ctl; public ForkJoinPool() {
/**
* 1)并行度为当前 JVM 的 CPU 总数 Runtime.getRuntime().availableProcessors()
* 2)使用默认的线程工厂
* 3)无异常处理器
* 4)工作队列的任务处理方式为 FILO
* 提交队列的任务处理方式为 FIFO,工作窃取
* 5)默认的核心工作者线程数为 0
* 6)默认的最大工作者线程数为 32767
* 7)默认工作者线程空闲超时时间为 60 秒
*/
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false,
0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
} /**
* @param parallelism 并行度【影响任务队列的长度和工作者线程数】
*/
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
} /**
* @param parallelism 并行度
* @param factory 工作者线程工厂
* @param handler 异常处理器
* @param asyncMode 任务处理模式
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(parallelism, factory, handler, asyncMode,
0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
} /**
* @param parallelism ForkJoinPool 的并行度
* @param factory 工作者线程工厂
* @param handler 每个工作者线程的异常处理器
* @param asyncMode 工作队列的任务处理模式,默认是 false【FILO】,
* 消息模式下可以指定为 FIFO
* @param corePoolSize 核心工作者线程数,默认等于 parallelism
* @param maximumPoolSize 最大工作者线程数
* @param minimumRunnable 最小可用工作者线程数
* @param saturate 当线程池尝试创建 > maximumPoolSize 的工作者线程时,目标任务将被拒绝,
* 如果饱和断言返回 true,则该任务将继续执行
* @param keepAliveTime 工作线程的空闲超时时间
* @param unit keepAliveTime 的时间单位
* @since 9
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode,
int corePoolSize,
int maximumPoolSize,
int minimumRunnable,
Predicate<? super ForkJoinPool> saturate,
long keepAliveTime,
TimeUnit unit) {
// check, encode, pack parameters
if (parallelism <= 0 || parallelism > MAX_CAP ||
maximumPoolSize < parallelism || keepAliveTime <= 0L) {
throw new IllegalArgumentException();
}
if (factory == null) {
throw new NullPointerException();
}
// 计算超时时间
final long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
// 核心工作者线程数
final int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
// 计算 ctl
final long c = (long)-corep << TC_SHIFT & TC_MASK |
(long)-parallelism << RC_SHIFT & RC_MASK;
// 计算 mode
final int m = parallelism | (asyncMode ? FIFO : 0);
final int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
// 最小可用工作者线程数
final int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
final int b = minAvail - parallelism & SMASK | maxSpares << SWIDTH;
int n = parallelism > 1 ? parallelism - 1 : 1; // at least 2 slots
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
n = n + 1 << 1; // power of two, including space for submission queues workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
workQueues = new WorkQueue[n];
this.factory = factory;
ueh = handler;
this.saturate = saturate;
keepAlive = ms;
bounds = b;
mode = m;
ctl = c;
checkPermission();
}

提交任务

    /**
* 往线程池中提交一个 Runnable 任务
*/
@Override
@SuppressWarnings("unchecked")
public ForkJoinTask<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
// 如果 task 不是 ForkJoinTask 子类实例,则执行适配
return externalSubmit(task instanceof ForkJoinTask<?>
? (ForkJoinTask<Void>) task // avoid re-wrap
: new ForkJoinTask.AdaptedRunnableAction(task));
} private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
Thread t; ForkJoinWorkerThread w; WorkQueue q;
if (task == null) {
throw new NullPointerException();
}
/**
* 1)如果任务提交线程是 ForkJoinWorkerThread 实例 &&
* 工作者线程关联的 ForkJoinPool 就是当前线程池 &&
* 工作者线程驻留的任务队列不为 null
* 则优先提交到自己的任务队列
*/
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
(w = (ForkJoinWorkerThread)t).pool == this &&
(q = w.workQueue) != null) {
q.push(task);
// 2)将任务提交到共享提交队列
} else {
externalPush(task);
}
return task;
} ForkJoinTask#
/**
* 将 Runnable 任务适配为 ForkJoinTask 任务
*/
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
implements RunnableFuture<Void> {
/**
* 实际运行的任务
*/
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
if (runnable == null) {
throw new NullPointerException();
}
this.runnable = runnable;
}
@Override
public Void getRawResult() { return null; }
@Override
public void setRawResult(Void v) { }
// 运行 Runnable 任务
@Override
public boolean exec() { runnable.run(); return true; }
@Override
public void run() { invoke(); }
@Override
public String toString() {
return super.toString() + "[Wrapped task = " + runnable + "]";
}
private static final long serialVersionUID = 5232453952276885070L;
} /**
* 将一个 ForkJoinTask 提交到一个提交队列中
*/
final void externalPush(ForkJoinTask<?> task) {
int r; // initialize caller's probe
// 如果调用线程的探测值为 0
if ((r = ThreadLocalRandom.getProbe()) == 0) {
// 初始化线程局部随机数的探测值
ThreadLocalRandom.localInit();
/**
* 读取探测值【同一个线程只要不调用 advanceProbe 方法,探测值是不变的,
* 同一个线程提交任务时只要不出现竞争,就会将任务提交到相同的提交队列中】
*/
r = ThreadLocalRandom.getProbe();
}
for (;;) {
// 读取 mode
final int md = mode;
int n;
// 读取工作队列
WorkQueue[] ws = workQueues;
// 线程池已经设置了 SHUTDOWN 标识 || 工作队列为空,则拒绝提交任务
if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0) {
throw new RejectedExecutionException();
} else {
WorkQueue q;
// push 是否添加成功,grow 是否需要进行扩容
boolean push = false, grow = false;
// 1)基于探测值掩码定位到指定索引的任务队列,如果该任务队列还未创建
if ((q = ws[n - 1 & r & SQMASK]) == null) {
// 读取工作者名称前缀
final Object lock = workerNamePrefix;
/**
* 基于探测值、队列静止标识、非 FIFO 标识、无所有者标识
* 生成队列 ID
*/
final int qid = (r | QUIET) & ~(FIFO | OWNED);
// 创建提交队列
q = new WorkQueue(this, null);
// 写入队列 ID
q.id = qid;
// 队列设置为静止状态
q.source = QUIET;
// 锁定队列
q.phase = QLOCK; // lock queue
if (lock != null) {
// 同步锁定队列
synchronized (lock) { // lock pool to install
int i;
/**
* 再次判断指定目标索引下的任务队列是否还为空,
* 出现竞争时可能被其他线程提前写入
*/
if ((ws = workQueues) != null &&
(n = ws.length) > 0 &&
ws[i = qid & n - 1 & SQMASK] == null) {
// 写入新建队列
ws[i] = q;
// 新建队列后需要扩容,并提交任务
push = grow = true;
}
}
}
}
// 2)提交队列已经存在,则尝试获得共享锁
else if (q.tryLockSharedQueue()) {
// 获取锁成功,读取 base 和 top 值
final int b = q.base, s = q.top;
int al, d; ForkJoinTask<?>[] a;
// 1)提交队列的任务数组不为空 && 任务数组未满
if ((a = q.array) != null && (al = a.length) > 0 &&
al - 1 + (d = b - s) > 0) {
// 基于 top 值定位数组索引后写入任务
a[al - 1 & s] = task;
// 递增 top 值并写入
q.top = s + 1; // relaxed writes OK here
// 释放共享锁
q.phase = 0;
// 队列中有多于 1 个任务
if (d < 0 && q.base - s < -1)
{
break; // no signal needed
}
// 数组已满则需要进行扩容
} else {
grow = true;
}
// 添加任务成功
push = true;
}
if (push) {
// 如果需要执行扩容
if (grow) {
try {
// 执行任务队列的扩容
q.growArray();
// 读取 top 值
final int s = q.top;
int al; ForkJoinTask<?>[] a;
if ((a = q.array) != null && (al = a.length) > 0) {
// 写入任务
a[al - 1 & s] = task;
// 递增 top 值并写入
q.top = s + 1;
}
} finally {
// 释放共享锁
q.phase = 0;
}
}
// 添加任务成功,则尝试添加或唤醒工作者线程
signalWork();
break;
} else {
// 出现线程竞争时,重新生成探测值并进行重试
r = ThreadLocalRandom.advanceProbe(r);
}
}
}
} static final class WorkQueue {
/**
* 任务数组的初始化容量为 8192
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* 任务数组的最大容量
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
/**
* 值为负数:
* 1)(任务队列的 ID +累计版本号) | UNSIGNALLED
* 2)DORMANT:任务队列处于静止状态 && 驻留线程已经阻塞、需要唤醒信号
* 0:队列未锁定
* 1:队列被锁定
*/
volatile int phase;
// 队列进入静止状态时的控制变量值,可能包含前一个静止的任务队列信息
int stackPred;
// 工作者线程窃取的任务数
int nsteals;
int id; // index, mode, tag
// 上一次窃取的工作队列 ID 或哨兵值
volatile int source;
// 执行 poll 操作的索引
volatile int base;
// 执行 push 或 pop 操作的索引
int top;
// 底层存储 ForkJoinTask 的数组
ForkJoinTask<?>[] array;
// 任务队列所在的线程池
final ForkJoinPool pool;
// 工作队列驻留的工作者线程,共享提交队列为 null
final ForkJoinWorkerThread owner; /**
* 尝试锁定共享提交队列
*/
boolean tryLockSharedQueue() {
return PHASE.compareAndSet(this, 0, QLOCK);
} /**
* 初始化队列或执行双倍扩容
*/
ForkJoinTask<?>[] growArray() {
// 读取旧队列
final ForkJoinTask<?>[] oldA = array;
// 读取旧 size
final int oldSize = oldA != null ? oldA.length : 0;
// 计算新 size,初始化容量为 8192
final int size = oldSize > 0 ? oldSize << 1 : INITIAL_QUEUE_CAPACITY;
// size 小于 8192 或大于 64M
if (size < INITIAL_QUEUE_CAPACITY || size > MAXIMUM_QUEUE_CAPACITY) {
// 扩容失败则抛出 RejectedExecutionException 异常,任务被拒绝
throw new RejectedExecutionException("Queue capacity exceeded");
}
int oldMask, t, b;
// 基于新的 size 创建 ForkJoinTask 数组
final ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
// 旧数组中有任务存在,则执行迁移操作
if (oldA != null && (oldMask = oldSize - 1) > 0 &&
(t = top) - (b = base) > 0) {
// 计算新任务数组的 mask
final int mask = size - 1;
do { // emulate poll from old array, push to new array
// 从 base 开始迁移任务
final int index = b & oldMask;
// 读取任务
final ForkJoinTask<?> x = (ForkJoinTask<?>)
QA.getAcquire(oldA, index);
// 将旧数组中对应的 slot 置为 null
if (x != null &&
QA.compareAndSet(oldA, index, x, null)) {
// 写入新数组
a[b & mask] = x;
}
// 循环迁移
} while (++b != t);
VarHandle.releaseFence();
}
// 返回新数组
return a;
}
} /**
* 尝试创建或唤醒一个工作者
*/
final void signalWork() {
for (;;) {
long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
// 1)活跃工作者线程数已经 >= 并行度
if ((c = ctl) >= 0L) {
break;
// 2)核心工作者线程未满 && 无空闲工作者线程
} else if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) {
// 尝试增加一个工作者线程
tryAddWorker(c);
}
break;
}
// 3)workQueues 为 null 表示线程池未启动或已经终止
else if ((ws = workQueues) == null) {
break; // unstarted/terminated
// 4)线程池已经终止
} else if (ws.length <= (i = sp & SMASK)) {
break; // terminated
// 5)线程池正在终止,目标工作队列已经被回收
} else if ((v = ws[i]) == null) {
break; // terminating
// 6)核心工作者线程未满 && 有工作者线程静止或已经阻塞
} else {
// 读取最近静止的工作队列 ID 及其工作队列索引
final int np = sp & ~UNSIGNALLED;
// 读取工作队列的 phase 值
final int vp = v.phase;
/**
* 读取在这个工作队列静止或工作者线程阻塞前,
* 上一个静止的工作者队列或阻塞的工作者线程所处的控制变量
*/
final long nc = v.stackPred & SP_MASK | UC_MASK & c + RC_UNIT;
// 读取工作队列驻留线程
final Thread vt = v.owner;
// 如果当前工作队列是最近静止的或其工作者线程是最近阻塞的,则尝试恢复为静止之前的控制变量
if (sp == vp && CTL.compareAndSet(this, c, nc)) {
// 写入 phase 值
v.phase = np;
// 如果 source 为 DORMANT【工作者线程阻塞前一刻写入】
if (v.source < 0) {
// 唤醒阻塞的工作者线程
LockSupport.unpark(vt);
}
break;
}
}
}
} /**
* 尝试增加一个工作者线程
* Tries to add one worker, incrementing ctl counts before doing
* so, relying on createWorker to back out on failure.
*/
private void tryAddWorker(long c) {
do {
// 活跃工作者线程数和总的工作者线程数,递增 1
final long nc = RC_MASK & c + RC_UNIT |
TC_MASK & c + TC_UNIT;
// 如果控制变量未更新 && 原子更新当前控制变量成功
if (ctl == c && CTL.compareAndSet(this, c, nc)) {
// 创建一个新的工作者线程
createWorker();
break;
}
// 出现竞争,则重新判断并重试
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
} /**
* 尝试创建并启动一个新的工作者线程
*/
private boolean createWorker() {
// 读取工作者线程工厂
final ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 创建一个新的工作者线程
if (fac != null && (wt = fac.newThread(this)) != null) {
// 启动工作线程
wt.start();
return true;
}
} catch (final Throwable rex) {
ex = rex;
}
// 创建失败或启动失败,则注销当前工作者
deregisterWorker(wt, ex);
return false;
} final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
int phase = 0;
// 目标工作者线程不为 null && 任务队列不为 null
if (wt != null && (w = wt.workQueue) != null) {
final Object lock = workerNamePrefix;
// 计算窃取任务数
final long ns = w.nsteals & 0xffffffffL;
// 计算工作队列索引
final int idx = w.id & SMASK;
if (lock != null) {
WorkQueue[] ws; // remove index from array
synchronized (lock) {
// 将工作者线程驻留的任务队列回收
if ((ws = workQueues) != null && ws.length > idx &&
ws[idx] == w) {
ws[idx] = null;
}
// 累积总的窃取任务数
stealCount += ns;
}
}
// 读取工作队列 phase 值
phase = w.phase;
}
// 工作队列不是静止状态
if (phase != QUIET) { // else pre-adjusted
long c; // decrement counts
// 活跃工作者线程数和总工作者线程数递减 1
do {} while (!CTL.weakCompareAndSet
(this, c = ctl, RC_MASK & c - RC_UNIT |
TC_MASK & c - TC_UNIT |
SP_MASK & c));
}
// 如果工作者线程驻留的工作队列不为 null
if (w != null)
{
// 取消所有的任务
w.cancelAll(); // cancel remaining tasks
}
/**
* 终止线程池失败 && 工作队列不为 null && 任务数组不为 null
*/
if (!tryTerminate(false, false) && // possibly replace worker
w != null && w.array != null) {
// 尝试创建或唤醒一个工作者线程
signalWork();
}
// 1)如果不是异常终止,则删除过时的异常信息
if (ex == null) {
ForkJoinTask.helpExpungeStaleExceptions();
} else {
// 2)重新抛出异常
ForkJoinTask.rethrow(ex);
}
} * @param now
* true 表示无条件终止,
* false 表示等到线程池无任务或无活跃工作者线程之后终止
* @param enable true 可能在下次终止
* @return true if terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED // 线程池状态不是 SHUTDOWN
while (((md = mode) & SHUTDOWN) == 0) {
/**
* enable 为 false 和通用线程池不允许终止
*/
if (!enable || this == common) {
return false;
} else {
// 设置线程池状态为 SHUTDOWN
MODE.compareAndSet(this, md, md | SHUTDOWN);
}
} while (((md = mode) & STOP) == 0) { // try to initiate termination
// 如果不是立刻终止
if (!now) { // check if quiescent & empty
for (long oldSum = 0L;;) { // repeat until stable
boolean running = false;
long checkSum = ctl;
final WorkQueue[] ws = workQueues;
// 1)有活跃的工作者线程
if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0) {
running = true;
// 2)工作队列不为 null
} else if (ws != null) {
WorkQueue w; int b;
// 遍历所有的工作队列,看是否有未完成的任务
for (int i = 0; i < ws.length; ++i) {
if ((w = ws[i]) != null) {
checkSum += (b = w.base) + w.id;
if (b != w.top ||
(i & 1) == 1 && w.source >= 0) {
// 工作队列中有任务未完成
running = true;
break;
}
}
}
}
// 1)线程池已经停止,则直接退出
if (((md = mode) & STOP) != 0) {
break; // already triggered
// 2)有活跃工作者线程或有任务未完成,则返回 false
} else if (running) {
return false;
} else if (workQueues == ws && oldSum == (oldSum = checkSum)) {
break;
}
}
}
// 设置线程池状态为 STOP
if ((md & STOP) == 0) {
MODE.compareAndSet(this, md, md | STOP);
}
} // 线程池状态不是 TERMINATED
while (((md = mode) & TERMINATED) == 0) { // help terminate others
for (long oldSum = 0L;;) { // repeat until stable
WorkQueue[] ws; WorkQueue w;
long checkSum = ctl;
// workQueues 不为空
if ((ws = workQueues) != null) {
for (final WorkQueue element : ws) {
// 当前任务队列不为 null
if ((w = element) != null) {
final ForkJoinWorkerThread wt = w.owner;
// 取消所有的任务
w.cancelAll(); // clear queues
// 如果是工作队列
if (wt != null) {
try { // unblock join or park
// 中断工作者线程
wt.interrupt();
} catch (final Throwable ignore) {
}
}
// 累加校验和
checkSum += w.base + w.id;
}
}
}
if (((md = mode) & TERMINATED) != 0 ||
workQueues == ws && oldSum == (oldSum = checkSum)) {
break;
}
}
// 1)线程池已经终止
if ((md & TERMINATED) != 0) {
break;
// 2)还有工作者线程未中断
} else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0) {
break;
// 3)设置线程池状态为 TERMINATED
} else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
synchronized (this) {
notifyAll(); // for awaitTermination
}
break;
}
}
// 终止成功返回 true
return true;
} WorkQueue# /**
* 工作者线程将任务提交到自己驻留的工作队列中
*/
void push(ForkJoinTask<?> task) {
// 读取 top 索引
final int s = top; ForkJoinTask<?>[] a; int al, d;
/**
* 1)保存任务的 ForkJoinTask 数组不为 null && 长度 > 0
*/
if ((a = array) != null && (al = a.length) > 0) {
// 基于旧的 top 索引计算任务存放的索引
final int index = al - 1 & s;
final ForkJoinPool p = pool;
// 递增 top 索引
top = s + 1;
// 将任务存储到 ForkJoinTask 数组中
QA.setRelease(a, index, task);
// 1)如果是工作队列的第一个任务 && pool 不为 null
if ((d = base - s) == 0 && p != null) {
VarHandle.fullFence();
// 尝试新增或唤醒工作者线程
p.signalWork();
}
// 2)如果任务队列已满,则执行扩容
else if (d + al == 1) {
growArray();
}
}
}

工作者线程的执行逻辑

public class ForkJoinWorkerThread extends Thread {
// 工作者线程所属的线程池
final ForkJoinPool pool;
// 工作者驻留的任务队列
final ForkJoinPool.WorkQueue workQueue; /**
* 创建一个在目标 pool 中执行任务的 ForkJoinWorkerThread 实例
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super("aForkJoinWorkerThread");
this.pool = pool;
// 将当前工作者线程注册到线程池中
this.workQueue = pool.registerWorker(this);
} /**
* 工作线程启动前的钩子函数
*/
protected void onStart() {
} /**
* 工作者线程退出后的钩子函数
*/
protected void onTermination(Throwable exception) {
} /**
* 运行工作者线程
*/
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
// 前置钩子
onStart();
// 运行工作队列
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
// 后置钩子
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
// 注销工作者
pool.deregisterWorker(this, exception);
}
}
}
} /**
* 工作者线程执行完驻留队列的任务后,会触发一次 afterTopLevelExec 回调
*/
void afterTopLevelExec() {
}
}
ForkJoinPool#
/**
* 工作者线程的核心运行逻辑
*/
final void runWorker(WorkQueue w) {
WorkQueue[] ws;
// 执行任务数组的初始化
w.growArray(); // allocate queue
// 计算随机窃取任务的任务队列索引
int r = w.id ^ ThreadLocalRandom.nextSecondarySeed();
if (r == 0) {
r = 1;
}
int lastSignalId = 0; // avoid unneeded signals
// 循环处理
while ((ws = workQueues) != null) {
boolean nonempty = false; // scan
/**
* n:length
* m:mask
* b:base
* i:index
* a:array
* q:WorkQueue
* al:array length
*/
for (int n = ws.length, j = n, m = n - 1; j > 0; --j) {
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
/**
* 1)基于随机索引定位的任务队列不为 null &&
* 目标任务队列中有任务需要处理 &&
* 目标任务队列的任务数组不为空
*/
if ((i = r & m) >= 0 && i < n && // always true
(q = ws[i]) != null && (b = q.base) - q.top < 0 &&
(a = q.array) != null && (al = a.length) > 0) {
// 被窃取任务队列的 ID
final int qid = q.id; // (never zero)
// 计算任务索引,窃取任务是从 base 开始的
final int index = al - 1 & b;
// 读取目标任务
final ForkJoinTask<?> t = (ForkJoinTask<?>)
QA.getAcquire(a, index);
// 任务被当前线程获取,未出现竞争
if (t != null && b++ == q.base &&
QA.compareAndSet(a, index, t, null)) {
/**
* 当前任务队列里还有任务待处理 &&
* 第一次窃取该任务队列里的任务
*/
if ((q.base = b) - q.top < 0 && qid != lastSignalId)
{
// 尝试增加或唤醒一个工作者线程来帮忙处理任务
signalWork(); // propagate signal
}
// 写入上一次窃取的任务队列 ID
w.source = lastSignalId = qid;
// 执行目标任务
t.doExec();
// 1)当前工作队列如果是 FIFO 模式
if ((w.id & FIFO) != 0) {
// 则从 base 位置开始处理自己的任务
w.localPollAndExec(POLL_LIMIT);
} else {
// 则从 top 位置开始处理自己的任务
w.localPopAndExec(POLL_LIMIT);
}
// 读取工作队列驻留线程
final ForkJoinWorkerThread thread = w.owner;
// 递增窃取任务数
++w.nsteals;
// 重置窃取任务队列 ID
w.source = 0; // now idle
if (thread != null) {
// 驻留线程不为 null,则执行 afterTopLevelExec 回调
thread.afterTopLevelExec();
}
}
// 处理完一个任务之后,再次尝试窃取该任务队列里的任务
nonempty = true;
}
// 2)如果目标任务队列里的任务已经处理完毕,则退出此次扫描【一次只处理一个任务队列】
else if (nonempty) {
break;
// 3)定位到的任务队列无任务可处理,则扫描下一个任务队列
} else {
++r;
}
} // 1)如果成功处理完一个任务队列里的任务,则重新进行定位
if (nonempty) { // move (xorshift)
r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
}
// 2)扫描了所有的任务队列都没有任务可处理
else {
int phase;
// 重置 lastSignalId
lastSignalId = 0; // clear for next scan
// 1)如果队列还未进入静止状态
if ((phase = w.phase) >= 0) { // enqueue
// 写入队列 ID 及其工作队列索引
final int np = w.phase = phase + SS_SEQ | UNSIGNALLED;
long c, nc;
do {
// 记录先前的控制变量到 stackPred 中
w.stackPred = (int)(c = ctl);
// 活跃工作者数递减 1,同时写入 np
nc = c - RC_UNIT & UC_MASK | SP_MASK & np;
// 将当前队列的状态写入控制变量中,写入成功后尝试执行最后一次扫描
} while (!CTL.weakCompareAndSet(this, c, nc));
}
else { // already queued
// 读取 stackPred
final int pred = w.stackPred;
// 工作队列置为静止 && 需要唤醒信号
w.source = DORMANT; // enable signal
for (int steps = 0;;) {
int md, rc; long c;
if (w.phase >= 0) {
w.source = 0;
break;
}
// 2)线程池正在停止,则当前工作者需要退出
else if ((md = mode) < 0) {
return;
// 3)线程池正在关闭,则当前工作者需要退出
} else if ((rc = (md & SMASK) + // possibly quiescent
(int)((c = ctl) >> RC_SHIFT)) <= 0 &&
(md & SHUTDOWN) != 0 &&
tryTerminate(false, false)) {
return; // help terminate
// 4)在多次阻塞之间清空中断状态
} else if ((++steps & 1) == 0) {
Thread.interrupted(); // clear between parks
/**
* 5)当前已经无活跃工作者线程 &&
* 已经有工作队列静止 &&
* 当前队列是最近静止的工作队列
*/
} else if (rc <= 0 && pred != 0 && phase == (int)c) {
// 计算截止时间
final long d = keepAlive + System.currentTimeMillis();
// 阻塞到截止时间
LockSupport.parkUntil(this, d);
// 阻塞过程中一直无任务提交
if (ctl == c &&
d - System.currentTimeMillis() <= TIMEOUT_SLOP) {
// 递减总工作者线程数
final long nc = UC_MASK & c - TC_UNIT |
SP_MASK & pred;
// 更新控制变量
if (CTL.compareAndSet(this, c, nc)) {
// 将工作队列设置为静止状态
w.phase = QUIET;
// 当前工作者退出工作
return; // drop on timeout
}
}
// 6)阻塞当前工作者等待唤醒,ForkJoinPool 保证至少会有一个工作者线程不会退出
} else {
LockSupport.park(this);
}
}
}
}
}
} /**
* 将工作者线程 wt 注册到当前线程池中
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
// 设置为守护线程
wt.setDaemon(true);
// 如果存在异常处理器
if ((handler = ueh) != null) {
// 写入异常处理器
wt.setUncaughtExceptionHandler(handler);
}
// 为工作者线程创建一个工作队列
final WorkQueue w = new WorkQueue(this, wt);
int tid = 0; // for thread name
// 线程池是否为 FIFO 模式
final int fifo = mode & FIFO;
// 读取工作者线程名称前缀
final String prefix = workerNamePrefix;
if (prefix != null) {
synchronized (prefix) {
final WorkQueue[] ws = workQueues; int n;
// 计算索引种子
final int s = indexSeed += SEED_INCREMENT;
if (ws != null && (n = ws.length) > 1) {
// 计算掩码
final int m = n - 1;
// 基于索引种子、掩码计算队列 ID
tid = s & m;
// 计算奇数索引值
int i = m & (s << 1 | 1); // odd-numbered indices
// 查找空闲的 slot
for (int probes = n >>> 1;;) { // find empty slot
WorkQueue q;
// 1)当前索引定位的任务队列为 null || 任务队列为静止状态
if ((q = ws[i]) == null || q.phase == QUIET) {
break;
// 2)所有的奇数索引位都已被占用,则需要进行扩容
} else if (--probes == 0) {
i = n | 1; // resize below
break;
// 3)计算下一个奇数索引位
} else {
i = i + 2 & m;
}
} // 写入工作队列索引、模式等
final int id = i | fifo | s & ~(SMASK | FIFO | DORMANT);
// 写入队列 ID
w.phase = w.id = id; // now publishable
// 如果索引 i 所在的 slot 为空或工作队列为静止状态
if (i < n) {
// 写入工作队列
ws[i] = w;
} else { // expand array
// 执行 WorkQueue 的扩容
final int an = n << 1;
// 双倍扩容
final WorkQueue[] as = new WorkQueue[an];
// 写入工作队列
as[i] = w;
final int am = an - 1;
for (int j = 0; j < n; ++j) {
WorkQueue v; // copy external queue
// 迁移旧 workQueues 中的任务队列
if ((v = ws[j]) != null) {
as[v.id & am & SQMASK] = v;
}
if (++j >= n) {
break;
}
as[j] = ws[j]; // copy worker
}
// 写入 workQueues
workQueues = as;
}
}
}
// 设置工作者线程名称
wt.setName(prefix.concat(Integer.toString(tid)));
}
return w;
}

ForkJoinTask.fork/join/invoke

  • fork:将任务提交到 ForkJoinPool 中异步执行
    /**
* 1)如果当前线程是 ForkJoinPool 工作者线程,则将其提交到驻留的工作队列中。
* 2)否则将当前 ForkJoinTask 任务提交到 common 池中
*/
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
((ForkJoinWorkerThread)t).workQueue.push(this);
} else {
ForkJoinPool.common.externalPush(this);
}
return this;
}
  • join
    /**
* 阻塞等待当前 ForkJoinTask 执行完成并返回结果,
* 任务执行过程中可以抛出 RuntimeException 或 Error 异常。
* 任务执行线程可以是当前线程或其他工作者线程。
*/
public final V join() {
int s;
// 1)阻塞等待任务执行完成,如果是异常完成,则将抛出 RuntimeException 或 Error。
if (((s = doJoin()) & ABNORMAL) != 0) {
reportException(s);
}
// 2)执行成功则返回原始结果
return getRawResult();
} private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
/**
* 1)如果任务已经完成,则返回其状态
* 2)如果当前线程是 ForkJoinWorkerThread &&
* 则尝试从驻留工作队列顶部拉取此任务 &&
* 在当前线程中执行此任务 &&
* 执行成功则返回任务状态
* 3)如果当前线程是 ForkJoinWorkerThread,但是拉取任务失败,
* 则表示【目标任务不在顶部、或其他的工作者线程窃取了此任务在执行】,则等待任务完成。
* 4)如果当前线程不是 ForkJoinWorkerThread,则阻塞等待任务完成。
*/
return (s = status) < 0 ? s :
(t = Thread.currentThread()) instanceof ForkJoinWorkerThread ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
} ForkJoinPool#WorkQueue#
/**
* 只有当目标任务 task 是工作队列顶部的第一个任务时,才将此任务移除,并返回 true,
* 否则返回 false。
*/
boolean tryUnpush(ForkJoinTask<?> task) {
// 读取 base
final int b = base;
// 读取 top
int s = top, al; ForkJoinTask<?>[] a;
// 任务数组不为空 && 有任务待处理
if ((a = array) != null && b != s && (al = a.length) > 0) {
// 计算读取索引
final int index = al - 1 & --s;
// 如果顶部任务就是当前任务 task,则将 slot 置为 null
if (QA.compareAndSet(a, index, task, null)) {
// 更新 top 值,并返回 true
top = s;
VarHandle.releaseFence();
return true;
}
}
return false;
} /**
* 窃取任务的主要执行方法
*/
final int doExec() {
int s; boolean completed;
// 任务未完成
if ((s = status) >= 0) {
try {
// 立即执行任务
completed = exec();
} catch (final Throwable rex) {
completed = false;
// 设置异常状态
s = setExceptionalCompletion(rex);
}
// 如果正常完成
if (completed) {
// 设置完成状态
s = setDone();
}
}
return s;
} /**
* 记录异常信息,触发 internalPropagateException 钩子函数
*/
private int setExceptionalCompletion(Throwable ex) {
final int s = recordExceptionalCompletion(ex);
if ((s & THROWN) != 0) {
internalPropagateException(ex);
}
return s;
} final int recordExceptionalCompletion(Throwable ex) {
int s;
// 任务未完成
if ((s = status) >= 0) {
// 计算此 ForkJoinTask 的哈希值
final int h = System.identityHashCode(this);
// 读取异常表的锁
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
// 读取异常表
final ExceptionNode[] t = exceptionTable;
// 计算索引
final int i = h & t.length - 1;
// 遍历单向链表
for (ExceptionNode e = t[i]; ; e = e.next) {
/**
* 1)目标 slot 为 null
* 2)已经到达链表尾部
*/
if (e == null) {
// 则将此异常加入异常表
t[i] = new ExceptionNode(this, ex, t[i],
exceptionTableRefQueue);
break;
}
// 如果已经加入了,则直接退出
if (e.get() == this) {
break;
}
}
} finally {
lock.unlock();
}
// 设置任务状态
s = abnormalCompletion(DONE | ABNORMAL | THROWN);
}
return s;
} /**
* 尝试将当前 ForkJoinTask 标记为由于取消或异常而完成
*/
private int abnormalCompletion(int completion) {
for (int s, ns;;) {
// 任务已经完成,则返回
if ((s = status) < 0) {
return s;
// 更新任务状态
} else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) {
// 如果有线程阻塞依赖该任务完成,则唤醒所有的阻塞线程
if ((s & SIGNAL) != 0) {
synchronized (this) { notifyAll(); }
}
return ns;
}
}
} /**
* 将任务状态设置为 DONE,如果有其他线程在阻塞等待该任务完成,则唤醒所有阻塞的线程
*/
private int setDone() {
int s;
/**
* 1)将任务状态设置为 DONE
* 2)如果有其他线程在阻塞等待该任务完成,则唤醒所有阻塞的线程
*/
if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) {
synchronized (this) { notifyAll(); }
}
return s | DONE;
} ForkJoinPool#WorkQueue#
/**
* 从工作队列的 top 位置开始循环扫描目标任务 task,如果找到则将其移除并执行
*/
void tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] wa; int s, wal;
// 此工作队列中有任务待处理
if (base - (s = top) < 0 && // traverse from top
(wa = array) != null && (wal = wa.length) > 0) {
// 从工作队列的 top 位置开始循环扫描目标任务 task,如果找到则将其移除并执行
for (int m = wal - 1, ns = s - 1, i = ns; ; --i) {
final int index = i & m;
final ForkJoinTask<?> t = (ForkJoinTask<?>)
QA.get(wa, index);
// 1)已经没有更多的任务待扫描
if (t == null) {
break;
// 2)当前任务就是目标任务 task
} else if (t == task) {
// 移除该任务
if (QA.compareAndSet(wa, index, t, null)) {
top = ns; // 将已扫描的任务集体下移一个位置
for (int j = i; j != ns; ++j) {
ForkJoinTask<?> f;
final int pindex = j + 1 & m;
f = (ForkJoinTask<?>)QA.get(wa, pindex);
QA.setVolatile(wa, pindex, null);
final int jindex = j & m;
QA.setRelease(wa, jindex, f);
}
VarHandle.releaseFence();
// 执行目标任务
t.doExec();
}
break;
}
}
}
} final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
/**
* 工作队列不为 null && 任务不为 null
* 1)task 不是 CountedCompleter 任务
* 2)task 是 CountedCompleter,则尝试窃取和执行目标计算中的任务,直到其完成或无法找到任务为止
*/
if (w != null && task != null &&
(!(task instanceof CountedCompleter) ||
(s = w.localHelpCC((CountedCompleter<?>)task, 0)) >= 0)) {
// 尝试从工作队列中移除并运行此任务
w.tryRemoveAndExec(task);
// 读取上次窃取的任务队列ID和当前队列的ID
final int src = w.source, id = w.id;
// 读取任务状态
s = task.status;
// 任务未完成
while (s >= 0) {
WorkQueue[] ws;
boolean nonempty = false;
// 获取随机奇数索引
final int r = ThreadLocalRandom.nextSecondarySeed() | 1; // odd indices
if ((ws = workQueues) != null) { // scan for matching id
for (int n = ws.length, m = n - 1, j = -n; j < n; j += 2) {
WorkQueue q; int i, b, al; ForkJoinTask<?>[] a;
/**
* 目标索引 i 定位到的工作队列不为 null &&
* 此工作队列最近窃取了当前工作队列的任务 &&
* 此工作队列有任务待处理 &&
* 则帮助其处理任务
*/
if ((i = r + j & m) >= 0 && i < n &&
(q = ws[i]) != null && q.source == id &&
(b = q.base) - q.top < 0 &&
(a = q.array) != null && (al = a.length) > 0) {
// 窃取任务的队列ID
final int qid = q.id;
// 从 base 位置开始窃取
final int index = al - 1 & b;
// 读取任务
final ForkJoinTask<?> t = (ForkJoinTask<?>)
QA.getAcquire(a, index);
/**
* 目标任务不为 null &&
* 没有其他队列并发窃取此任务 &&
* 则尝试将此任务从目标工作队列中移除
*/
if (t != null && b++ == q.base && id == q.source &&
QA.compareAndSet(a, index, t, null)) {
// 窃取成功,则更新 base 值
q.base = b;
// 记录最近窃取任务的任务队列 ID
w.source = qid;
// 执行目标任务
t.doExec();
// 回写上次窃取任务的队列ID
w.source = src;
}
nonempty = true;
// 窃取并处理完一个任务,则退出循环
break;
}
}
}
// 1)目标任务已经完成,则返回
if ((s = task.status) < 0) {
break;
// 2)一个任务都没有窃取到
} else if (!nonempty) {
long ms, ns; int block;
// 1)非超时模式
if (deadline == 0L) {
ms = 0L; // untimed
// 2)如果已经超时,则返回
} else if ((ns = deadline - System.nanoTime()) <= 0L) {
break; // timeout
// 3)未超时,但是剩余时间 < 1 毫秒,则将其设置为 1 毫秒
} else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
{
ms = 1L; // avoid 0 for timed wait
}
// 尝试增加一个补偿工作者来处理任务
if ((block = tryCompensate(w)) != 0) {
// 阻塞等待
task.internalWait(ms);
// 如果添加成功,则递增活跃工作者数
CTL.getAndAdd(this, block > 0 ? RC_UNIT : 0L);
}
s = task.status;
}
}
}
return s;
} /**
* 如果任务未完成,则阻塞等待
*/
final void internalWait(long timeout) {
/**
* 将(旧 status 值 | SIGNAL) 的值写入 status 中,并返回旧值
* 如果任务未完成
*/
if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) {
synchronized (this) {
// 1)如果任务未完成,则阻塞等待
if (status >= 0) {
try { wait(timeout); } catch (final InterruptedException ie) { }
// 2)如果任务已经完成,则唤醒阻塞在此任务上的所有线程
} else {
notifyAll();
}
}
}
} /**
* 阻塞一个非工作者线程,直到任务完成
*/
private int externalAwaitDone() {
int s = tryExternalHelp();
// 任务未完成 && 写入唤醒标记
if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
boolean interrupted = false;
synchronized (this) {
for (;;) {
// 1)任务未完成
if ((s = status) >= 0) {
try {
// 阻塞等待任务完成
wait(0L);
// 工作者线程被中断,可能是线程池终止
} catch (final InterruptedException ie) {
interrupted = true;
}
}
// 2)任务完成则唤醒在此任务上阻塞等待的线程
else {
notifyAll();
break;
}
}
}
// 工作者线程被设置了中断标记
if (interrupted) {
// 则中断此工作者线程
Thread.currentThread().interrupt();
}
}
// 返回任务状态
return s;
} private int tryExternalHelp() {
int s;
/**
* 1)当前任务已经完成,则返回其状态
* 2)任务未完成,此任务是 CountedCompleter,则执行 externalHelpComplete
* 3)任务未完成,此任务是 ForkJoinTask,则执行 tryExternalUnpush && 拉取任务成功,则执行它
*/
return (s = status) < 0 ? s:
this instanceof CountedCompleter ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ?
doExec() : 0;
} final boolean tryExternalUnpush(ForkJoinTask<?> task) {
// 读取线程测试值
final int r = ThreadLocalRandom.getProbe();
WorkQueue[] ws; WorkQueue w; int n;
// 定位到的共享队列不为 null,则尝试从目标共享队列中移除此任务
return (ws = workQueues) != null &&
(n = ws.length) > 0 &&
(w = ws[n - 1 & r & SQMASK]) != null &&
w.trySharedUnpush(task);
} ForkJoinPool#WorkQueue
boolean trySharedUnpush(ForkJoinTask<?> task) {
boolean popped = false;
final int s = top - 1;
int al; ForkJoinTask<?>[] a;
// 任务数组不为空
if ((a = array) != null && (al = a.length) > 0) {
// 计算目标索引
final int index = al - 1 & s;
// 读取任务
final ForkJoinTask<?> t = (ForkJoinTask<?>) QA.get(a, index);
// 读取的任务就是目标任务 task && 尝试锁定共享队列
if (t == task &&
PHASE.compareAndSet(this, 0, QLOCK)) {
// 锁定成功 && 确认没有出现竞争 && 将目标 slot 置为 null
if (top == s + 1 && array == a &&
QA.compareAndSet(a, index, task, null)) {
// 成功弹出任务
popped = true;
// 更新 top 值
top = s;
}
// 释放共享锁
PHASE.setRelease(this, 0);
}
}
return popped;
}
  • invoke
    /**
* 立即在当前线程中执行此任务,等待任务执行完毕并返回结果,
* 或抛出 RuntimeException 或 Error 异常。
*/
public final V invoke() {
int s;
if (((s = doInvoke()) & ABNORMAL) != 0) {
reportException(s);
}
return getRawResult();
} private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
(t = Thread.currentThread()) instanceof ForkJoinWorkerThread ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}