简介
ForkJoin 框架,另一种风格的线程池(相比于ThreadPoolExecutor),采用分治算法,工作密取策略,极大地提高了并行性。对于那种大任务分割小任务的场景(分治)尤其有用。
框架图
几个角色
- ForkJoinTask: 有3个实现,分别是RecursiveTask,RecursiveAction,CountedCompleter.
- RecursiveTask: 可以递归执行的ForkJoinTask。
- RecursiveAction: 无返回值的RecursiveTask。
- CountedCompleter: 任务执行完成后,触发执行自定义钩子函数。
- ForkJoinWorkerThread: 运行 ForkJoinTask 任务的工作线程。
- WorkQueue: 任务队列,支持LIFO(last-in-first-out)的push和pop操作(top端),和FIFO(first-in-first-out)的poll操作(base端),队栈二相性。
-
WorkQueue[]: ForkJoinPool 中的任务分为两种,一种是本地提交的任务Submission task,通过execute()、submit()、invoke()等方法提交的任务;另外一种是工作线程fork出的子任务Worker task.
两种任务都会存放在WorkQueue数组中,Submission task存放在WorkQueue数组的偶数索引位置,Worker task存放在奇数索引位置。工作线程只会分配在奇数索引的工作队列。
基本算法
protected Long compute() {
if (end - start <= THRESHOLD) {
return justCompute();
} else {
left.fork();
right.fork();
return right.join() + left.join();
}
}
源码解析
数据结构
ForkJoinWorkerThreadFactory
线程工厂接口,用于创建工作线程ForkJoinWorkerThread,默认实现是DefaultForkJoinWorkerThreadFactory.
WorkQueue
work-stealing 模式的双端任务队列(内部是数组实现,ForkJoinTask<?>[] array)。
工作线程调用fork()方法将分解的任务入队(栈),处于top端(栈顶),工作线程处理自己工作队列的任务时,从栈顶取任务。
- 工作线程也会窃取别的队列的任务,从base端获取。
内部属性
static final int INITIAL_QUEUE_CAPACITY = 1 << 13; // 初始队列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 最大队列容量
volatile int scanState; // 扫描状态, <0: inactive; 奇数:scanning; 偶数:running
int stackPred; // 前任池(WorkQueue[])索引,由此构成一个栈
int nsteals; // 偷取的任务个数
int hint; // 记录偷取者的索引
int config; // pool index | mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // 栈底/队列头
int top; // 栈顶/队列尾
ForkJoinTask<?>[] array; // 任务数组
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 当前工作队列的工作线程,共享模式下为null
volatile Thread parker; // 调用park阻塞期间为owner,其他情况为null
volatile ForkJoinTask<?> currentJoin; // 记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // 记录从其他工作队列偷取过来的任务
ForkJoinTask
有3个实现,分别是RecursiveTask,RecursiveAction,CountedCompleter.
RecursiveTask: 可以递归执行的ForkJoinTask。
RecursiveAction: 无返回值的RecursiveTask。
CountedCompleter: 任务执行完成后,触发执行自定义钩子函数。
volatile int status; // 任务状态
static final int DONE_MASK = 0xf0000000; // 任务完成掩码
static final int NORMAL = 0xf0000000; // 正常,负数
static final int CANCELLED = 0xc0000000; // 取消,< NORMAL
static final int EXCEPTIONAL = 0x80000000; // 异常,< CANCELLED
static final int SIGNAL = 0x00010000; // 通知状态, >= 1 << 16
static final int SMASK = 0x0000ffff; // 低位掩码
EmptyTask
核心参数
static final int SMASK = 0xffff; // 低16位掩码,最大索引位
static final int MAX_CAP = 0x7fff; // 工作线程最大容量
static final int EVENMASK = 0xfffe; // 偶数低位掩码
static final int SQMASK = 0x007e; // 最多64个偶数槽位(0x007e = 0111 1110,有效的是中间6个1的位置,111111 = 63,再加上000000(0槽位),总共64个) static final int SCANNING = 1; // 标记正在scan任务
static final int INACTIVE = 1 << 31; // 未活动状态
static final int SS_SEQ = 1 << 16; // 版本号(防止CAS的ABA问题) static final int MODE_MASK = 0xffff << 16; // int高16位掩码
static final int LIFO_QUEUE = 0; // LIFO模式
static final int FIFO_QUEUE = 1 << 16; // FIFO模式
static final int SHARED_QUEUE = 1 << 31; // 共享模式 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; // 线程创建工厂 private static final RuntimePermission modifyThreadPermission; // 修改(启动或kill)线程所需要的权限 static final ForkJoinPool common; // 公共线程池 static final int commonParallelism; // 并行度 private static int commonMaxSpares; // 备用线程数 private static int poolNumberSequence; // 线程名称相关 private static final synchronized int nextPoolId() {
return ++poolNumberSequence;
} private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms private static final int DEFAULT_COMMON_MAX_SPARES = 256; // 默认备用线程数 private static final int SPINS = 0; // 自旋,暂未使用 private static final int SEED_INCREMENT = 0x9e3779b9; //indexSeed的增量 private static final long SP_MASK = 0xffffffffL; // 低32位掩码
private static final long UC_MASK = ~SP_MASK; // 高32位掩码 private static final int AC_SHIFT = 48; // 活跃线程shift
private static final long AC_UNIT = 0x0001L << AC_SHIFT; // 活跃线程增量单位
private static final long AC_MASK = 0xffffL << AC_SHIFT; // 活跃线程掩码 private static final int TC_SHIFT = 32; // 工作线程shift
private static final long TC_UNIT = 0x0001L << TC_SHIFT; // 工作线程增量单位
private static final long TC_MASK = 0xffffL << TC_SHIFT; // 工作线程掩码
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 创建工作线程的标记 // 线程池的状态
private static final int RSLOCK = 1; // 锁定
private static final int RSIGNAL = 1 << 1; // 通知
private static final int STARTED = 1 << 2; // 开始
private static final int STOP = 1 << 29; // 停止
private static final int TERMINATED = 1 << 30; // 终止
private static final int SHUTDOWN = 1 << 31; // 关闭 volatile long ctl; // 主控参数
volatile int runState; // 线程池运行状态
final int config; // 并行度 | 模式
int indexSeed; // 用于生成工作线程索引
volatile WorkQueue[] workQueues; // 池
final ForkJoinWorkerThreadFactory factory; // 线程工厂
final UncaughtExceptionHandler ueh; // 每个工作线程的异常信息
final String workerNamePrefix; // 用于创建工作线程的名称
volatile AtomicLong stealCounter; // 偷取任务总数,也可作为同步监视器
ctl
字段ctl是ForkJoinPool的核心状态,它是一个64位的long类型数值,包含4个16位子字段:
AC: 活动的工作线程数量减去目标并行度(目标并行度:最大的工作线程数量,所以AC一般是负值,等于0时,说明活动线程已经达到饱和了)
TC: 总的工作线程数量总数减去目标并行度(TC一般也是负值,等于0时,说明总的工作线程已经饱和,并且,AC一般小于等于TC)
SS: 栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程,第一位表示状态1.不活动 0.活动,后15表示版本号,标识ID的版本-最后16位)。
ID: 栈顶工作线程所在工作队列的池索引。
这样设计的好处是,通过观察AC或TC的符号(正负)就可以判断(活动|总)工作线程是否达到并行度。令sp = (int)ctl, sp取ctl的后32位,即SS|ID,如果sp非0,则可知有等待的工作线程。
runState
- STARTED 1
- STOP 1 << 1
- TERMINATED 1<<2
- SHUTDOWN 1<<29
- RSLOCK 1<<30
- RSIGNAL 1<<31
runState记录了线程池的运行状态,特别地,除了SHUTDOWN是负数外,其他值都是正数,RSLOCK和RSIGNAL是跟锁相关。
关键方法
提交任务
execute(ForkJoinTask<?>)/submit(ForkJoinTask<?>)/invoke(ForkJoinTask<?>)
public void execute(ForkJoinTask<?> task) { // 只提交任务
if (task == null)
throw new NullPointerException();
externalPush(task);
} public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { // 提交并立刻返回任务,ForkJoinTask实现了Future,支持异步取消等操作
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
} public <T> T invoke(ForkJoinTask<T> task) { // 提交任务,并等待返回执行结果
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
这三个方法都调用了externalPush(ForkJoinTask<?> task)方法,均属于外部提交,置于偶数索引工作队列。
externalPush(ForkJoinTask<?> task)
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue q;
int m;
int r = ThreadLocalRandom.getProbe(); // 探针值,用于计算WorkQueue槽位索引
int rs = runState;
if ((ws = workQueues) != null // 线程池不为空
&& (m = (ws.length - 1)) >= 0 // 线程池长度大于0
&& (q = ws[m & r & SQMASK]) != null // 获取偶数槽位的WorkQueue, m & r & SQMASK, m是全1的掩码,r是随机值,SQMASK(0x7E)偶数,与之与也是偶数
&& r != 0 // 探针值不为0
&& rs > 0 // 线程池状态大于0,SHUTDOWN < 0
&& U.compareAndSwapInt(q, QLOCK, 0, 1)) { // 0 -> 1获得锁
ForkJoinTask<?>[] a;
int am, n, s;
if ((a = q.array) != null // 任务队列(数组)不为空
&& (am = a.length - 1) > (n = (s = q.top) - q.base)) { // 且数组长度大于任务个数,不需要扩容
int j = ((am & s) << ASHIFT) + ABASE; // 计算任务索引位置
U.putOrderedObject(a, j, task); // 任务入队
U.putOrderedInt(q, QTOP, s + 1); // top加1
U.putIntVolatile(q, QLOCK, 0); // 1 -> 0解锁
if (n <= 1) // 之前任务个数小于等于1(刚巧又被别的线程偷走),那么此槽位上的线程有可能等待,如果大家都没任务,可能都在等待
signalWork(ws, q); // 唤醒可能存在的等待线程
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0); // 任务入队失败,解锁,走完整的添加任务方法
}
externalSubmit(task); // 如果不满足if条件,则走完整的添加任务方法
}
externalSubmit(ForkJoinTask<?> task)
private void externalSubmit(ForkJoinTask<?> task) {
int r; // 初始化当前线程的探针值,后续用于计算WorkQueue的索引
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws;
WorkQueue q;
int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) { // 如果线程池已经关闭,则去帮助关闭它
tryTerminate(false, false);
throw new RejectedExecutionException();
} else if ((rs & STARTED) == 0 || ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { // 初始华工作队列池
int ns = 0;
rs = lockRunState(); // 加锁
try {
if ((rs & STARTED) == 0) { // 加锁后再次判断线程池的状态,不重复初始化
U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // 初始化stealCounter
int p = config & SMASK;
int n = (p > 1) ? p - 1 : 1; // 保证n是2的幂次方
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED; // 标记已经启动
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns); // 释放锁
}
} else if ((q = ws[k = r & m & SQMASK]) != null) { // 获取随机偶数槽位的WorkQueue
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { // 锁定当前WorkQueue
ForkJoinTask<?>[] a = q.array; // 任务队列
int s = q.top; // 栈顶/队列尾部
boolean submitted = false; // 标记是否成功提交任务
try {
if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { // 任务队列(a)若为空,growArray()会初始化
int j = (((a.length - 1) & s) << ASHIFT) + ABASE; // 计算内存地址
U.putOrderedObject(a, j, task); // push任务
U.putOrderedInt(q, QTOP, s + 1); // 更新top值
submitted = true; // 标记任务提交成功
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0); // 解锁
}
if (submitted) {
signalWork(ws, q); // 唤醒挂起的线程
return;
}
}
move = true; // 操作失败,需要换个槽位,更新探针值
} else if (((rs = runState) & RSLOCK) == 0) { // q为空,则需要创建工作队列,在线程池没有锁定的情况下进行,不然之前换其他槽位
q = new WorkQueue(this, null); // 初始化工作队列
q.hint = r; // 记录偷取者的索引,初始为随机值
q.config = k | SHARED_QUEUE; // 索引 | 共享模式
q.scanState = INACTIVE; // 未激活
rs = lockRunState(); // 加锁
if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null)
ws[k] = q; // 将工作队列焊到池的槽位上
unlockRunState(rs, rs & ~RSLOCK); // 释放锁
} else
move = true;
if (move) // 更新探针值
r = ThreadLocalRandom.advanceProbe(r);
}
}
包含3部分
- 如果线程池还没初始化,则初始化线程池,长度是2的幂次方,期间需锁定runState
- 如果选中的槽位为空(没有工作队列入驻),则初始桦一个工作队列(共享模式的,因为是外部提交,偶数索引),初始是为激活状态,还没投入使用,设置到池里时需锁定runState
- 如果选中的槽位不为空,则获得任务队列(数组),尝试将任务提交进去,成功则唤醒可能沉睡的线程,并返回,如果失败(可能别的线程抢先一步),则转移槽位。
signalWork(WorkQueue[] ws, WorkQueue q)
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c;
int sp, i;
WorkQueue v;
Thread p;
while ((c = ctl) < 0L) { // 活跃线程未达到并行度,激活或创建
if ((sp = (int) c) == 0) { // 没有空闲的线程
if ((c & ADD_WORKER) != 0L) // (c & ADD_WORKER) != 0L,说明TC的最高位为1,为负值,而TC = 总的线程数 - 并行度 < 0,表示总的线程数 < 并行度,说明工作线程的个数还很少
tryAddWorker(c); // 尝试添加线程
break; // 退出
}
if (ws == null) // 未开始或已停止
break;
if (ws.length <= (i = sp & SMASK)) // 空闲线程栈顶端线程的所属工作队列索引(正常来讲,应该小于WorkQueue[]的长度的)
break;
if ((v = ws[i]) == null) // 正在终止,deregisterWorker方法可使对应槽位置空
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // 作为下一个scanState待更新的值(增加了版本号,并且调整为激活状态)
int d = sp - v.scanState; // 如果d为0,则说明scanState还为更新过,然后才考虑CAS ctl
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); // 下一个ctl的值,AC + 1 | 上一个等待线程的索引
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { // CAS ctl
v.scanState = vs; // 更新scanState
if ((p = v.parker) != null) // 如果线程阻塞了,唤醒它
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // 没有任务,直接退出
break;
}
}
创建或唤醒一个工作线程 。
这里有一个疑问
if (ws.length <= (i = sp & SMASK)) 表示已终止,ws的类型是WorkQueue[],取自workQueues,是一早被初始化的,长度是2的幂次方,后续对它的操作是扩容,但没有对它进行缩减操作,所以这个if语句应该永远不为真,不过,这并不影响理解,也不排除这种情况存在的可能性,也是一种保证,毕竟索引大小不能大于数组长度。
tryAddWorker(long c)
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); // 下一个ctl的值,AC和TC都加1
if (ctl == c) { // ctl没被其他线程改变
int rs, stop;
if ((stop = (rs = lockRunState()) & STOP) == 0) // 检查线程池是否停止
add = U.compareAndSwapLong(this, CTL, c, nc); // 更新ctl
unlockRunState(rs, rs & ~RSLOCK); // 解锁
if (stop != 0) // 已经停止,退出
break;
if (add) { // 更新ctl成功,创建线程
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int) c == 0); // 重新获取ctl,并且没有达到最大线程数,而且,没有空闲的线程
}
createWorker()
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) { // 调用线程工厂创建线程,会去注册线程
wt.start(); // 启动线程
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex); // 创建失败,注销线程,更新ctl
return false;
}
ForkJoinWorkerThread 的构造方法里会调用registerWorker(ForkJoinWorkerThread wt)方法。
registerWorker(ForkJoinWorkerThread wt)
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // 设置为守护线程
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt); // 创建工作队列
int i = 0; // 绑定线程池索引(WorkQueue[])
int mode = config & MODE_MASK;
int rs = lockRunState(); // 锁定
try {
WorkQueue[] ws;
int n;
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // 用于生成索引
int m = n - 1;
i = ((s << 1) | 1) & m; // 奇数
if (ws[i] != null) { // 碰撞了,选取别的槽位,找不到就扩容
int probes = 0; // 记录是否遍历一遍
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; // 步长约长度的一半
while (ws[i = (i + step) & m] != null) { // 一直寻找,每次增加步长
if (++probes >= n) { // 如果遍历一遍,就扩容
workQueues = ws = Arrays.copyOf(ws, n <<= 1); // 扩容,每次扩大一倍长度
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // 初始时为随机数
w.config = i | mode;
w.scanState = i; // 等于索引值
ws[i] = w; // 注册
}
} finally {
unlockRunState(rs, rs & ~RSLOCK); // 解锁
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); // 设置名字
return w;
}
注册线程,为该线程创建一个工作队列,并注册到线程池中的奇数索引上,如果找了一圈没找到,则扩容。
deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws; // 从WorkQueue[]中起初WorkQueue(wt对应的)
int idx = w.config & SMASK; // 取得索引
int rs = lockRunState(); // 加锁
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null; // 注销
unlockRunState(rs, rs & ~RSLOCK); // 解锁
}
long c;
do {
} while (!U.compareAndSwapLong(this, CTL, c = ctl,
((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); // AC和TC都减1
if (w != null) {
w.qlock = -1; // 标记终止
w.transferStealCount(this);
w.cancelAll(); // 取消剩下的任务
}
for (;;) { // 可能的替换操作
WorkQueue[] ws;
int m, sp;
if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0
|| (ws = workQueues) == null || (m = ws.length - 1) < 0) // 如果线程池已经终止,跳出循环
break;
if ((sp = (int) (c = ctl)) != 0) { // 如果有空闲线程,则唤醒一个线程替代已经注销的线程
if (tryRelease(c, ws[sp & m], AC_UNIT)) // 唤醒线程
break;
} else if (ex != null && (c & ADD_WORKER) != 0L) { // 如果没有空闲线程,并且没有达到并行度,创建一个
tryAddWorker(c);
break;
} else
break;
}
if (ex == null) // 异常处理
ForkJoinTask.helpExpungeStaleExceptions();
else
ForkJoinTask.rethrow(ex);
}
从线程池里注销线程,ws[idx] = null; // 注销,更新CTL,根据线程池的状态决定是否找一个自己的替代者(如果有空闲线程,则唤醒一个,否则,创建一个新的工作线程)
runWorker(WorkQueue w)
final void runWorker(WorkQueue w) {
w.growArray();
int seed = w.hint;
int r = (seed == 0) ? 1 : seed;
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null) // 扫描任务
w.runTask(t); // 执行任务
else if (!awaitWork(w, r)) // 没有任务执行,等待
break;
r ^= r << 13;
r ^= r >>> 17;
r ^= r << 5; // 更新随机值
}
}
- 扫描任务,扫描到一个任务,则执行此任务
- 如果没有扫描到任务,则调用awaitWork方法,返回true则继续参与扫描工作,否则任由其停止
scan(WorkQueue w, int r)
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws;
int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState;
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { // 初始起点为r,随机数
WorkQueue q;
ForkJoinTask<?>[] a;
ForkJoinTask<?> t;
int b, n;
long c;
if ((q = ws[k]) != null) { // 如果k槽位不为空,尝试从该任务队列里取任务
if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // 有任务
long i = (((a.length - 1) & b) << ASHIFT) + ABASE; // 内存地址
if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { // 获取i地址的内容(任务)
if (ss >= 0) { // 如果active,更新array数组i索引处为空,表示任务已经取走,并更新base的值
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // 通知其他线程
signalWork(ws, q);
return t;
}
} else if (oldSum == 0 && // 如果inactive,尝试active
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int) c], AC_UNIT); // 激活栈顶工作线程对应的工作队列(任务队列,inactive -> active)
}
if (ss < 0) // 获取任务失败,重来,更新ss为最新的scanState
ss = w.scanState;
r ^= r << 1; // 更新随机值
r ^= r >>> 3;
r ^= r << 10;
origin = k = r & m; // 重新扫描
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
if ((k = (k + 1) & m) == origin) { // 最后没有扫描到任务,准备inactive此工作队列
if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // 已经inactive,跳出
break;
int ns = ss | INACTIVE; // 尝试inactive
long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); // AC减1,更新ctl
w.stackPred = (int) c; // 保存原栈顶工作队列索引
U.putInt(w, QSCANSTATE, ns); // 设置scanState
if (U.compareAndSwapLong(this, CTL, c, nc)) // CAS ctl
ss = ns; // ss取最新的scanState
else
w.scanState = ss; // CAS ctl失败,设置回scanState
}
checkSum = 0;
}
}
}
return null;
}
从随机的索引开始扫描任务,如果拿到任务,唤醒其他线程;如果没有拿到,并且已经扫描一圈了,尝试inactive自己所在的工作队列。
awaitWork(WorkQueue w, int r)
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w已经终止,返回false,不再扫描任务
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
if ((ss = w.scanState) >= 0) // 如果已经active,跳出,返回true,继续扫描任务
break;
else if (spins > 0) { // 如果spins > 0,自旋等待
r ^= r << 6;
r ^= r >>> 21;
r ^= r << 7;
if (r >= 0 && --spins == 0) { // 随机消耗自旋次数
WorkQueue v;
WorkQueue[] ws;
int s, j;
AtomicLong sc;
if (pred != 0 // 除了自己,还有等待的线程-工作队列
&& (ws = workQueues) != null // 线程池还在
&& (j = pred & SMASK) < ws.length // 前任索引还在池范围内
&& (v = ws[j]) != null // 前任任务队列还在
&& (v.parker == null || v.scanState >= 0)) // 前任线程已经唤醒,且工作队列已经激活
spins = SPINS; // 上面的一系列判断表明,很快就有任务了,先不park,继续自旋
}
} else if (w.qlock < 0) // 自旋之后,再次检查工作队列是否终止,若是,退出扫描
return false;
else if (!Thread.interrupted()) { // 如果线程中断了,清除中断标记,不考虑park,否则进入该分支
long c, prevctl, parkTime, deadline;
int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK); // 计算活跃线程的个数
if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // 线程池正在终止,退出扫描
return false;
if (ac <= 0 && ss == (int) c) { // 自己是栈顶等待者
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); // 设置为前一次的ctl
int t = (short) (c >>> TC_SHIFT); // 总的线程数
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) // 总线程数过多,直接退出扫描
return false;
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); // 计算等待时间
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
} else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // 加锁
w.parker = wt; // 设置parker,准备阻塞
if (w.scanState < 0 && ctl == c) // 阻塞前再次检查状态
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null); // 唤醒后,置空parker
U.putObject(wt, PARKBLOCKER, null); // 解锁
if (w.scanState >= 0) // 已激活,跳出继续扫描
break;
if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L
&& U.compareAndSwapLong(this, CTL, c, prevctl)) // 超时,未等到任务,跳出,不再执行扫描任务,削减工作线程
return false; // shrink pool
}
}
return true;
}
若为扫描到任务,则会执行此方法。
- 首先会自旋等待,自旋过程中如果发现前任线程已经唤醒,且工作队列已经激活,说明已经有任务了,接着自旋等待
- 若发现工作队列已经终止,则返回false,表示自己退出扫描工作
- 如果线程中断了,清除中断标记,不考虑park,否则,进行下面一系列操作
- 如果发现当前线程数量过多,则直接返回false,自己不再参与扫描工作
- 计算阻塞时间,准备阻塞自己
- 阻塞前或唤醒后,都会判断线程池的状态,工作队列的状态,以判断自己是否继续参与扫描工作
fork()
public final ForkJoinTask<V> fork() { // 从top端压入任务
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this); // 当前工作线程所在的工作队列
else
ForkJoinPool.common.externalPush(this); // 外部提交任务
return this;
}
join()
public final V join() { // 调用doJoin()
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
doJoin()
private int doJoin() {
int s;
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s
: ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
? (w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s
: wt.pool.awaitJoin(w, this, 0L)
: externalAwaitDone();
}
转换成下面的形式更好看点
private int doJoin() {
int s;
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w; if((s = status) < 0) { // 已经有结果,直接返回
return s;
}else {
if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { // 如果是工作线程
if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 尝试从本工作队列里取出等待的任务
&& (s = doExec()) < 0) { // 如果取出了任务,则去执行它,并返回结果
return s;
}else {
return wt.pool.awaitJoin(w, this, 0L); // 否则,可能有别的线程把这个任务偷走了,执行内部等待方法
}
}else { // 如果是外部线程,执行外部等待方法
return externalAwaitDone();
}
} }
externalAwaitDone()
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // 如果是CountedCompleter类型的任务,执行externalHelpComplete方法
ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>) this, 0)
: ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); // 否则,执行tryExternalUnpush方法,成功则执行任务,否则,返回0
if (s >= 0 && (s = status) >= 0) { // 如果任务没有结束,则等待
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { // 将status设置为SIGNAL,等着被notify
synchronized (this) {
if (status >= 0) { // 任务未结束
try {
wait(0L); // 等待
} catch (InterruptedException ie) {
interrupted = true; // 记录中断标记
}
} else
notifyAll(); // 任务已经结束,通知等待的线程
}
}
} while ((s = status) >= 0); // 任务未结束,就一直等待
if (interrupted)
Thread.currentThread().interrupt(); // 补上中断
}
return s;
}
tryExternalUnpush(ForkJoinTask<?> task)
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue w;
ForkJoinTask<?>[] a;
int m, s;
int r = ThreadLocalRandom.getProbe();
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && (w = ws[m & r & SQMASK]) != null
&& (a = w.array) != null && (s = w.top) != w.base) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
if (w.top == s && w.array == a && U.getObject(a, j) == task
&& U.compareAndSwapObject(a, j, task, null)) { // 如果task在任务队列的top位置,则返回true; 否则,返回false
U.putOrderedInt(w, QTOP, s - 1);
U.putOrderedInt(w, QLOCK, 0);
return true;
}
U.compareAndSwapInt(w, QLOCK, 1, 0);
}
}
return false;
}
tryUnpush(ForkJoinTask<?> t)
final boolean tryUnpush(ForkJoinTask<?> t) { // 任务t在array的top位时,返回true; 否则,返回false
ForkJoinTask<?>[] a;
int s;
if ((a = array) != null && (s = top) != base
&& U.compareAndSwapObject(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
}
doExec()
final int doExec() {
int s;
boolean completed;
if ((s = status) >= 0) {
try {
completed = exec(); // 执行具体的任务
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline)
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin; // 记录上一个join的任务
U.putOrderedObject(w, QCURRENTJOIN, task); // 设置task为当前join的任务
CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>) task : null;
for (;;) {
if ((s = task.status) < 0) // 任务已经结束,跳出循环
break;
if (cc != null)
helpComplete(w, cc, 0); // CountedCompleter类型的任务调用helpComplete()方法
else if (w.base == w.top || w.tryRemoveAndExec(task)) // 任务队列为空或执行失败(任务被别的线程偷走了),帮助偷取者执行该任务
helpStealer(w, task);
if ((s = task.status) < 0) // 任务已经结束,跳出循环
break;
long ms, ns;
if (deadline == 0L) // 任务等待时间
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L) // 超时退出
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) { // 尝试补偿策略(找一个替代者执行任务,自己在这儿等)
task.internalWait(ms); // 补偿成功,等待指定时间
U.getAndAddLong(this, CTL, AC_UNIT); // 活跃线程加1
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin); // 设置回前一个join的任务
}
return s;
}
tryRemoveAndExec(ForkJoinTask<?> task)
final boolean tryRemoveAndExec(ForkJoinTask<?> task) { // 该方法返回true, 代表可以去偷取任务执行了;否则,继续等待
ForkJoinTask<?>[] a;
int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) {
while ((n = (s = top) - (b = base)) > 0) { // 从top开始,遍历任务队列,查找是否存在给定task
for (ForkJoinTask<?> t;;) {
long j = ((--s & m) << ASHIFT) + ABASE; // 计算任务内存地址,s递减
if ((t = (ForkJoinTask<?>) U.getObject(a, j)) == null) // 如果任务为空,
return s + 1 == top; // 如果top位为空,说明任务队列已经空了,返回true
else if (t == task) { // 找到给定任务task
boolean removed = false;
if (s + 1 == top) { // 任务在top位置,直接弹出pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
} else if (base == b) // 如果不是在top,使用EmptyTask填补此位置
removed = U.compareAndSwapObject(a, j, task, new EmptyTask());
if (removed)
task.doExec(); // 执行任务
break; // 跳出,继续检查task.status,判断任务是否结束
} else if (t.status < 0 && s + 1 == top) { // 任务结束(取消),且目前检查的是top
if (U.compareAndSwapObject(a, j, t, null)) // 检查task是否在top位,如果是并置空top位,更新top为s(top - 1)
U.putOrderedInt(this, QTOP, s);
break; // 任务取消了
}
if (--n == 0) // 遍历结束
return false;
}
if (task.status < 0) // 任务结束
return false;
}
}
return true;
}
从top位置开始向下遍历任务,如果找到给定任务,把它从当前Worker的任务队列中移除并执行,移除的位置使用EmptyTask代替。如果任务队列为空或者任务未执行完毕返回true;任务执行完毕返回false.
helpStealer(WorkQueue w, ForkJoinTask<?> task)
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { // 帮助偷取者(偷取自己任务的线程)执行任务
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null && task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v是子任务的偷取者
descent: for (subtask = task; subtask.status >= 0;) { // 从目标任务开始,记录当前Join的任务,也包括偷取者当前Join的任务,递归帮助
for (int h = j.hint | 1, k = 0, i;; k += 2) { // 每次跳2个,遍历奇数位索引
if (k > m) // 如果遍历一遍没有找到偷取者,跳出循环
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) { // 定位到偷取者,更新hint为偷取者索引,方便下次定位
j.hint = i;
break;
}
checkSum += v.base; // 若没有定位到,则累加工作队列的base值,继续遍历
}
}
for (;;) { // 帮助偷取者执行任务
ForkJoinTask<?>[] a; // 偷取者的任务数组
int b;
checkSum += (b = v.base); // 累加偷取者的base值
ForkJoinTask<?> next = v.currentJoin; // 记录偷取者Join的任务
if (subtask.status < 0 || j.currentJoin != subtask || v.currentSteal != subtask) // subtask结束,或者数据不一致了(j.currentJoin != subtask || v.currentSteal != subtask)
break descent; // 跳出外层循环重来
if (b - v.top >= 0 || (a = v.array) == null) { // 偷取者的任务列表为空
if ((subtask = next) == null) // 偷取者的Join任务为空,跳出外层循环
break descent;
j = v; // 否则,j取v的值(j指向被偷者,v指向偷取者),且subtask指向next Join任务
break; // 继续遍历,寻找偷取者的偷取者(递归)
}
int i = (((a.length - 1) & b) << ASHIFT) + ABASE; // 偷取者的base内存地址
ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i)); // 获取base位置任务
if (v.base == b) {
if (t == null) // 任务为空,跳出外层循环(可能被别的线程拿走了)
break descent;
if (U.compareAndSwapObject(a, i, t, null)) { // poll(从base位置取出任务)
v.base = b + 1; // 更新base的值
ForkJoinTask<?> ps = w.currentSteal; // 记录调用者之前偷取的任务
int top = w.top; // 记录调用者的top值
do {
U.putOrderedObject(w, QCURRENTSTEAL, t); // 更新currentSteal为刚刚偷取到的任务
t.doExec(); // 指向任务
} while (task.status >= 0 && w.top != top && (t = w.pop()) != null); // 如果任务未结束,且自己任务队列不为空,优先处理自己队列里的任务
U.putOrderedObject(w, QCURRENTSTEAL, ps); // 把之前偷取的任务设置回currentSteal
if (w.base != w.top) // 自己队列来新任务了,直接返回
return;
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum)); // Join的任务未结束,且任务在流动中,继续帮助执行
}
}
- 每次跳2个槽位,遍历奇数位索引,直到定位到偷取者,并记录偷取者的索引(hint = i),方便下次定位。
- 获取偷取者的任务列表,帮助其执行任务,如果执行过程中发现自己任务列表里有任务,则依次弹出执行。
- 如果偷取者任务队列为空,则帮助其执行Join任务,寻找偷取者的偷取者,如此往复,加快任务执行。
- 如果最后发现自己任务队列不为空(base != top),则退出帮助。
- 最后判断任务task是否结束,如果未结束,且工作队列base和在变动中,说明偷取任务一直在进行,则重复以上操作,加快任务执行。
tryCompensate(WorkQueue w)
private boolean tryCompensate(WorkQueue w) { // 找一个替代者执行任务,自己等待任务结束
boolean canBlock;
WorkQueue[] ws;
long c;
int m, pc, sp;
if (w == null || w.qlock < 0 // 调用者正在终止
|| (ws = workQueues) == null || (m = ws.length - 1) <= 0 // 线程池结束
|| (pc = config & SMASK) == 0) // 并行度为0(不可用)
canBlock = false; // 不可阻塞
else if ((sp = (int) (c = ctl)) != 0) // 如果有空闲线程,释放空闲线程
canBlock = tryRelease(c, ws[sp & m], 0L);
else { // 没有空闲线程,尝试创建一个
int ac = (int) (c >> AC_SHIFT) + pc; // 活跃线程数
int tc = (short) (c >> TC_SHIFT) + pc; // 总的线程数
int nbusy = 0; // 验证饱和度(Running线程数是否等于总的线程数)
for (int i = 0; i <= m; ++i) {
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) { // 遍历两遍奇数索引槽位
if ((v.scanState & SCANNING) != 0)
break;
++nbusy;
}
}
if (nbusy != (tc << 1) || ctl != c) // 遍历两遍奇数索引槽位,tc需要乘以2
canBlock = false; // 并不是所有的线程都在干活,或者数据(ctl)失效,不要阻塞
else if (tc >= pc && ac > 1 && w.isEmpty()) { // 总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空
long nc = ((AC_MASK & (c - AC_UNIT)) | (~AC_MASK & c)); // AC - 1
canBlock = U.compareAndSwapLong(this, CTL, c, nc);
} else if (tc >= MAX_CAP || (this == common && tc >= pc + commonMaxSpares)) // TC达到最大容量
throw new RejectedExecutionException("Thread limit exceeded replacing blocked worker");
else {
boolean add = false;
int rs;
long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT))); // 总的线程数加1,活跃线程数不变(补偿)
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
canBlock = add && createWorker(); // 创建工作线程
}
}
return canBlock;
}
- 调用者队列不为空,并且有空闲工作线程,唤醒空闲线程(tryRelease)
- 线程池未停止,活跃线程数不足,新建一个工作线程(createWorker)
- 工作队列正在停止或线程池停止,总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空,不需要补偿
awaitQuiescence(long timeout, TimeUnit unit)
public boolean awaitQuiescence(long timeout, TimeUnit unit) { // 等待所有的任务执行结束
long nanos = unit.toNanos(timeout);
ForkJoinWorkerThread wt;
Thread thread = Thread.currentThread();
if ((thread instanceof ForkJoinWorkerThread) && (wt = (ForkJoinWorkerThread) thread).pool == this) {
helpQuiescePool(wt.workQueue); // 如果是工作线程,帮助执行任务,使其尽快结束
return true;
}
long startTime = System.nanoTime();
WorkQueue[] ws;
int r = 0, m;
boolean found = true;
while (!isQuiescent() && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { // 任务未执行结束
if (!found) { // 未找到任务
if ((System.nanoTime() - startTime) > nanos)
return false; // 超时返回
Thread.yield(); // 让出CPU时间片,让其他线程快点干活
}
found = false;
for (int j = (m + 1) << 2; j >= 0; --j) { // j初始值是4 * ws.length, 然后递减,这是要遍历4次的节奏
ForkJoinTask<?> t;
WorkQueue q;
int b, k;
if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null && (b = q.base) - q.top < 0) {
found = true;
if ((t = q.pollAt(b)) != null) // 从base位置取出任务并执行
t.doExec();
break;
}
}
}
return true;
}
helpQuiescePool(WorkQueue w)
final void helpQuiescePool(WorkQueue w) {
ForkJoinTask<?> ps = w.currentSteal; // 保存当前偷取的任务
for (boolean active = true;;) {
long c;
WorkQueue q;
ForkJoinTask<?> t;
int b;
w.execLocalTasks(); // 首先执行自己队列里的任务
if ((q = findNonEmptyStealQueue()) != null) { // 如果查找到非空WorkQueue
if (!active) { // 当前是inactive
active = true; // 重新设置为active
U.getAndAddLong(this, CTL, AC_UNIT); // AC + 1
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { // 任务不为空,从base位置取任务
U.putOrderedObject(w, QCURRENTSTEAL, t); // 记录t为当前偷取的任务
t.doExec(); // 开始干活
if (++w.nsteals < 0) // 增加计数
w.transferStealCount(this); // 将自己的计数添加到线程池的总计数上面去
}
} else if (active) { // 是active,但是没找到任务
long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c); // AC - 1
if ((int) (nc >> AC_SHIFT) + (config & SMASK) <= 0)
break; // AC为0,退出
if (U.compareAndSwapLong(this, CTL, c, nc)) // CAS ctl的值
active = false; // inactive
} else if ((int) ((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 // inactive, AC == 0
&& U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) // AC + 1, 保证AC至少为1
break;
}
U.putOrderedObject(w, QCURRENTSTEAL, ps); // 设置回偷取的任务
}
awaitTermination(long timeout, TimeUnit unit)
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (this == common) {
awaitQuiescence(timeout, unit);
return false;
}
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
if (nanos <= 0L)
return false;
long deadline = System.nanoTime() + nanos;
synchronized (this) {
for (;;) { // 等待线程池终止
if (isTerminated())
return true;
if (nanos <= 0L)
return false;
long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
wait(millis > 0L ? millis : 1L);
nanos = deadline - System.nanoTime();
}
}
}
tryTerminate(boolean now, boolean enable)
private boolean tryTerminate(boolean now, boolean enable) {
int rs;
if (this == common) // 公共线程池,不能shutdown
return false;
if ((rs = runState) >= 0) {
if (!enable)
return false;
rs = lockRunState(); // 进入SHUTDOWN阶段
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
} if ((rs & STOP) == 0) { // 准备进入STOP阶段
if (!now) { // 必要的检查
for (long oldSum = 0L;;) {
WorkQueue[] ws;
WorkQueue w;
int m, b;
long c;
long checkSum = ctl;
if ((int) (checkSum >> AC_SHIFT) + (config & SMASK) > 0)
return false; // 如果有活动的工作线程,还不能停止
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break;
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
if ((b = w.base) != w.top || w.scanState >= 0 || w.currentSteal != null) {
tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);
return false; // 有任务在执行,还不能停止
}
checkSum += b; // 累加base值
if ((i & 1) == 0)
w.qlock = -1; // 偶数索引工作队列,qlock = -1, 拦截从外部提交的任务
}
}
if (oldSum == (oldSum = checkSum)) // 稳定了,退出
break;
}
}
if ((runState & STOP) == 0) { // 如果now等于true,立刻进入STOP结点
rs = lockRunState();
unlockRunState(rs, (rs & ~RSLOCK) | STOP);
}
} int pass = 0;
for (long oldSum = 0L;;) {
WorkQueue[] ws;
WorkQueue w;
ForkJoinWorkerThread wt;
int m;
long checkSum = ctl;
if ((short) (checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || (ws = workQueues) == null
|| (m = ws.length - 1) <= 0) { // 可以终止了
if ((runState & TERMINATED) == 0) {
rs = lockRunState();
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); // 进入TERMINATED阶段
synchronized (this) {
notifyAll(); // 唤醒所有线程
}
}
break; // 跳出
}
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
checkSum += w.base;
w.qlock = -1; // 设置不可用
if (pass > 0) {
w.cancelAll(); // 取消所有的任务
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try {
wt.interrupt(); // 中断线程
} catch (Throwable ignore) {
}
}
if (w.scanState < 0)
U.unpark(wt); // 唤醒线程
}
}
}
}
if (checkSum != oldSum) { // 不稳定,重来
oldSum = checkSum;
pass = 0;
} else if (pass > 3 && pass > m) // 退出
break;
else if (++pass > 1) { // 出队
long c;
int j = 0, sp;
while (j++ <= m && (sp = (int) (c = ctl)) != 0)
tryRelease(c, ws[sp & m], AC_UNIT);
}
}
return true;
}
SHUTDOWN(!common) -> STOP(无任务执行) -> TERMINATED(over)
fork-join
1.有一个大的任务Task(8), 最终被分解成8个小任务Task(1)
2.将Task(8)加入到任务队列里面(偶数索引,图中未显示),线程A偷取到Task(8),fork了2个Task(4),push到任务队列里面
3.pop出Task(4), fork出2个Task(2),push到任务队列里面
4. pop出Task(2), fork出2个Task(1), push到任务队列里面
5.pop出任务Task(1),此刻已经达到最小粒度,开始执行该任务;与此同时,线程B从底部(base)位置steal走了Task(4)
6.线程B拿到Task(4)之后,fork出了2个Task(2),push到任务队列里面
7.线程A执行完自己的任务后,由于Task(4).join(),索性定位到偷走自己任务的线程B所在的工作队列,帮助其执行任务,整体加快任务进度,帮助的方式也是steal
以上是最简单的一种fork.join方式。
行文至此结束。
尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_fjp.html