ThreadPoolExecutor源码解读

时间:2022-08-11 10:29:19

1. 背景与简介

在Java中异步任务的处理,我们通常会使用Executor框架,而ThreadPoolExecutor是JUC为我们提供的线程池实现。

线程池的优点在于规避线程的频繁创建,对线程资源统一管理,在任务到达时能快速响应。

本文从JUC的ThreadPoolExecutor源码出发来剖析线程池的实现原理。

要比较轻松地理解ThreadPoolExecutor源码,最好需要对AbstractQueuedSynchronizer, BlockingQueue, FutureTask等类有比较熟悉的认知

另外也需要对Executor框架本身有基本认识。

关于AbstractQueuedSynchronizer的源码解读,可以参考我的AQS解读

关于FutureTask的源码解读可以参考我的FutureTask解读

线程池的大致处理流程如下图所示,线程池内部有一个阻塞队列作为任务队列用以存储提交的任务,线程池中的工作线程作为消费者从阻塞队列中不断获取任务执行。

ThreadPoolExecutor源码解读

上图截取自《Java并发编程的艺术》

从逻辑角度来说,线程池可以划分出一个核心线程池,在新任务到达时,如果核心线程池未满,会创建新线程来运行任务,即便核心线程池有空闲线程。

如果核心线程池满了,则会将任务加入到任务队列中,最终被工作线程从队列中取出执行。

如果任务队列已满,只要线程数未达到最大线程数限制,会创建一个新线程来运行任务;否则会调用饱和策略来处理该任务。

我们也可以参考下面的线程池执行示意图。

ThreadPoolExecutor源码解读

上图截取自《Java并发编程的艺术》

1.1 线程池参数

这里介绍ThreadPoolExecutor中几个比较关键的变量/参数:

  • corePoolSize 核心线程数:如果线程池中线程数量小于corePoolSize,即便现有线程有空闲也会创建新线程来运行新任务
  • maximumPoolSize 最大线程数:如果线程池中线程数量大于corePoolSize并且任务队列满时会创建新线程来运行新任务
  • keepAliveTime 线程存活时间:若果线程池中线程数量大于corePoolSize,则多余的线程在空闲时间超过keepAliveTime后会退出
  • allowCoreThreadTimeout 核心线程超时控制标志位:用于标识是否keepAliveTime的效果同样作用在核心线程上。

需要注意的是线程池中的实际工作线程数可能会超过maximumPoolSize,因为这个参数是可以通过setMaximumPoolSize方法动态调整的。

下面介绍几种Executors工具类中提供的常见的基于ThreadPoolExecutor构造的参数配置

fixedThreadPool

corePoolSize : n

maximumPoolSize : n

keepAliveTime: 0

workerQueue: LinkedBlockingQueue (*阻塞队列)

singleThreadExecutor

corePoolSize : 1

maximumPoolSize : 1

keepAliveTime: 0

workerQueue : LinkedBlockingQueue (*阻塞队列)

cachedThreadPool

corePoolSize : 0

maximumPoolSize : Integer.MAX

keepAliveTime : 60s

workerQueue : SynchronousQueue (同步队列)

2. 生命周期

线程池的完整生命周期具有如下五个阶段:

  • RUNNING

    这是线程池的初始状态。此状态下线程池会接受新任务并且处理队列中等待的任务。
  • SHUTDOWN

    RUNNING状态下调用shutdown方法后进入此状态。此状态下线程池不接受新任务,但会处理队列中等待的任务。
  • STOP

    RUNNING/SHUTDOWN状态下调用shutdownNow方法后进入此状态。此状态下线程池不接受新任务,也不处理既有等待任务,并且会中断既有运行中的线程。
  • TIDYING

    SHUTDOWN/STOP状态会流转到此状态。此时所有任务都已运行完毕,工作线程数为0,任务队列都为空。从字面角度理解,此时线程池已经清干净了。
  • TERMINATED

    TIDYING状态下,线程池执行完terminated钩子方法后进入此状态,此时线程池已完全终止。

2.1 线程池中生命周期的表示

ctl是线程池中一个核心状态控制变量,它的类型为AtomicInteger。ctl实际上存储了两方面信息:线程数和线程池的状态。

ctl的低29位用于表示线程数,因此范围上界约为5亿;

而高3位用于表示5种生命周期状态,对应的值分别是(注意:在ctl中实际上下面的值左移29位放在高3位存储):

  • RUNNING -1
  • SHUTDOWN 0
  • STOP 1
  • TIDYING 2
  • TERMINATED 3

因此如果我们想要取出线程数就可以用ctl和(1<<29)-1作位与运算,而如果想取出线程池状态的话就用(1<<29)-1取反后和ctl作位与运算。

ThreadPoolExecutor源码解读

图为自制的线程池的生命周期状态流转示意图

3. 源码实现

接下来,分几部分介绍线程池。

  • 工作线程封装: 介绍线程池中的Worker类, runWorker, getTask, processWorkerExit等方法。
  • 提交任务的处理: execute, addWorker, addWorkerFailed等方法。
  • 关闭线程池: shutdown, shutdownNow, tryTerminate, interruptWorkers, interruptIdleWorkers等方法。
  • 饱和策略: 内置四种饱和策略简介。

3.1 工作线程的封装抽象

ThreadPoolExecutor的Worker类是一个非常重要的内部类,它是对工作线程的封装,很有必要花上功夫详细解读。

Worker类内部包含:

  • final Thread thread 对应的工作线程对象
  • Runnable firstTask 初始任务
  • volatile long completedTasks 用于统计完成的任务

    Worker类本身继承了AbstractQueuedSynchronizer,并实现了一个简单的互斥锁Mutex Lock。

3.1.1 runWorker

Worker类实现了Runnable接口,并将run方法的实现委托给了外部类ThreadPoolExecutor的runWorker方法。

/**
* 工作线程运行核心逻辑。
* 简单来说做的事情就是不断从任务队列中拿取任务运行。
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
// 把firstTask设置为null,从GC角度来看,这处代码很重要。
w.firstTask = null;
// 置互斥锁状态为0,此时可以被中断。
w.unlock();
// 用于标记完成任务时是否有异常。
boolean completedAbruptly = true;
try {
// 循环:初始任务(首次)或者从阻塞阻塞队列里拿一个(后续)。
while (task != null || (task = getTask()) != null) {
/*
* 获取互斥锁。
* 在持有互斥锁时,调用线程池shutdown方法不会中断该线程。
* 但是shutdownNow方法无视互斥锁,会中断所有线程。
*/
w.lock();
/*
* 这里if做的事情就是判断是否需要中断当前线程。
* 如果线程池至少处于STOP阶段,当前线程未中断,则中断当前线程;
* 否则清除线程中断位。
*
* if条件中的Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)
* 做的事情说穿了就是清除中断位并确认目前线程池状态没有达到STOP阶段。
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 调用由子类实现的前置处理钩子。
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 调用由子类实现的后置处理钩子。
afterExecute(task, thrown);
}
} finally {
// 清空task, 计数器+1, 释放互斥锁。
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
/*
* 处理工作线程退出。
* 上面主循环中的前置处理、任务调用、后置处理都是可能会抛出异常的。
*/
processWorkerExit(w, completedAbruptly);
}
}

3.1.2 getTask

/**
* 工作线程从任务队列中拿取任务的核心方法。
* 根据配置决定采用阻塞或是时限获取。
* 在以下几种情况会返回null从而接下来线程会退出(runWorker方法循环结束):
* 1. 当前工作线程数超过了maximumPoolSize(由于maximumPoolSize可以动态调整,这是可能的)。
* 2. 线程池状态为STOP (因为STOP状态不处理任务队列中的任务了)。
* 3. 线程池状态为SHUTDOWN,任务队列为空 (因为SHUTDOWN状态仍然需要处理等待中任务)。
* 4. 根据线程池参数状态以及线程是否空闲超过keepAliveTime决定是否退出当前工作线程。
*/
private Runnable getTask() {
// 上次从任务队列poll任务是否超时。
boolean timedOut = false; for (;;) {
int c = ctl.get();
int rs = runStateOf(c); /*
* 如果线程池状态已经不是RUNNING状态了,则设置ctl的工作线程数-1
* if条件等价于 rs >= STOP || (rs == SHUTDOWN && workQueue.isEmpty())
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
} int wc = workerCountOf(c); /*
* allowCoreThreadTimeOut是用于设置核心线程是否受keepAliveTime影响。
* 在allowCoreThreadTimeOut为true或者工作线程数>corePoolSize情况下,
* 当前工作线程受keepAliveTime影响。
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /*
* 1. 工作线程数>maximumPoolSize,当前工作线程需要退出。
* 2. timed && timedOut == true说明当前线程受keepAliveTime影响且上次获取任务超时。
* 这种情况下只要当前线程不是最后一个工作线程或者任务队列为空,则可以退出。
*
* 换句话说就是,如果队列不为空,则当前线程不能是最后一个工作线程,
* 否则退出了就没线程处理任务了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 设置ctl的workCount减1, CAS失败则需要重试(因为上面if中的条件可能不满足了)。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
} try {
// 根据timed变量的值决定是时限获取或是阻塞获取任务队列中的任务。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// workQueue.take是不会返回null的,因此说明poll超时了。
timedOut = true;
} catch (InterruptedException retry) {
// 在阻塞队列上等待时如果被中断,则清除超时标识重试一次循环。
timedOut = false;
}
}
}

3.1.3 processWorkerExit

在工作线程退出的时候,processWorkerExit方法会被调用。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
/*
* 因为正常退出,workerCount减1这件事情是在getTask拿不到任务的情况下做掉的。
* 所以在有异常的情况下,需要在本方法里给workCount减1。
*/
if (completedAbruptly)
decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加completedTaskCount,从工作线程集合移除自己。
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
} // 由于workCount减1,需要调用tryTerminate方法。
tryTerminate(); int c = ctl.get();
// 只要线程池还没达到STOP状态,任务队列中的任务仍然是需要处理的。
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
/*
* 确定在RUNNING或SHUTDOWN状态下最少需要的工作线程数。
*
* 默认情况下,核心线程不受限制时影响,
* 在这种情况下核心线程数量应当是稳定的。
* 否则允许线程池中无线程。
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果任务队列非空,至少需要1个工作线程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 无需补偿工作线程。
if (workerCountOf(c) >= min)
return;
}
// 异常退出或者需要补偿一个线程的情况下,加一个空任务工作线程。
addWorker(null, false);
}
}

3.2 提交任务的处理

3.2.1 execute

/**
* execute方法可以说是线程池中最核心的方法,
* 在继承链上层的AbstractExecutorService中将各种接受新任务的方法最终转发给了此方法进行任务处理。
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 分类讨论:
* 1. 如果当前线程数<核心线程数,则会开启一个新线程来执行提交的任务。
*
* 2. 尝试向任务队列中添加任务。这时需要再次检查方法开始到当前时刻这段间隙,
* 线程池是否已经关闭了/线程池中没有工作线程了。
* 如果线程池已经关闭了,需要在任务队列中移除先前提交的任务。
* 如果没有工作线程了,则需要添加一个空任务工作线程用于执行提交的任务。
*
* 3. 如果无法向阻塞队列中添加任务,则尝试创建一个新的线程执行任务。
* 如果失败,回调饱和策略处理任务。
*/
int c = ctl.get();
// 线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 检查线程池是否处于运行状态,并向任务队列中添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/*
* 再次检查是否线程池处于运行状态,如果不是则移除任务并回调饱和策略拒绝任务。
* 因为有可能上面if条件读到线程池处于运行状态,而后shutdown/shutdownNow方法被调用,
* 这时候需要把尝试刚才加入任务队列中的任务移除。
*/
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果workerCount为0,需要添加一个工作线程用于执行提交的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 添加一个新的工作线程处理任务。
* 如果失败,则说明线程池已经关闭或者已经饱和了,此时回调饱和策略来拒绝任务。
*/
else if (!addWorker(command, false))
reject(command);
}

3.2.2 addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); /*
* 如果线程池状态至少为STOP,返回false,不接受任务。
* 如果线程池状态为SHUTDOWN,并且firstTask不为null或者任务队列为空,同样不接受任务。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; for (;;) {
int wc = workerCountOf(c);
/*
* CAPACITY为(1<<29)-1,这是线程池中线程数真正的上界,绝不允许超过。
* 因为ThreadPoolExecutor设计中是用低29位表示工作线程数的。
*
* 否则根据参数中是否以corePoolSize为上界进行判断,如果超过,则新增worker失败。
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 成功新增workCount,跳出整个循环往下走。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
/*
* 重读总控状态,如果运行状态变了,重试整个大循环。
* 否则说明是workCount发生了变化,重试内层循环。
*/
if (runStateOf(c) != rs)
continue retry;
}
} // 运行到此处时,线程池线程数已经成功+1,下面进行实质操作。 boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 由于获取锁之前线程池状态可能发生了变化,这里需要重新读一次状态。
int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 向工作线程集合添加新worker,更新largestPoolSize。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 成功增加worker后,启动该worker线程。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// worker线程如果没有成功启动,回滚worker集合和worker计数器的变化。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

3.2.3 addWorkerFailed

在新增工作线程失败的情况下,调用addWorkerFailed:

  1. 从worker集合删除失败的worker。
  2. workCount减1。
  3. 调用tryTerminate尝试终止线程池。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

3.3 线程池的关闭(shutdown与shutdownNow)

在介绍线程池关闭shutdown与shutdownNow相关源码实现时,需要先分析两个很重要的方法

  • tryTerminate

    线程池的生命周期最终态为TERMINATED,然而TERMINATED状态的演进未必是调用shutdown/shutdownNow能做到,因为TERMINATED状态下,线程池中已经没有工作线程,也没有任务队列。tryTerminate方法里面包含了一个逻辑上的责任链,将线程池状态的演进动作在线程中传播下去。

  • interruptIdleWorkers

    Worker类是线程池对工作线程的封装抽象。它主要做的事情就是不断从任务队列中取任务执行,遇到异常退出。如果在工作线程等待任务(阻塞在阻塞队列)时中断该工作线程,则工作线程会重试一次getTask的循环来获取任务,获取不到就会退出runWorker方法的大循环,从而进入processWorkerExit方法收尾。ThreadPoolExecutor中线程中断的主要方法便是interruptIdleWorkers。可以通过参数控制是否最多中断1个线程。

3.3.1 tryTerminate

这是线程池中一个很重要的方法。它是实现线程池状态从SHUTDOWN或者STOP流转到TIDYING->TERMINATED的桥梁方法。

final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 能够进行状态流转的情况是:
* 1. STOP状态
* 2. SHUTDOWN并且任务队列已空。
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/*
* 这时只需要所有工作线程退出即可终止线程池。
* 如果仍然有工作线程,则中断一个空闲的线程。
*
* 一旦空闲线程被终止,则会进入processWorkerExit方法,
* 在processWorkerExit方法中即将退出的工作线程会调用tryTerminate,
* 从而将终止线程池的动作通过这样的机制在线程间传播下去。
*/
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
} final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 这时workerCount已经为0,任务队列也已为空,状态流转到TIDYING。
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 调用terminated()钩子方法。
terminated();
} finally {
// 将线程池状态拨到TERMINATED。
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有在线程池终止条件上等待的线程。
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 线程池状态流转CAS失败的话重试循环。
}
}

3.3.2 interruptIdleWorkers

/**
* 参数中的onlyOne表示至多只中断一个工作线程。
* 在tryTerminate方法读取到目前线程池离可以进入终止状态只剩下workCount降为0时,
* 会调用interruptIdeleWorkers(true)。因为有可能此时其他所有线程都阻塞在任务队列上,
* 只要中断任意一个线程,通过processWorkerExit -> tryTerminate ->interruptIdleWorkers,
* 可以使线程中断+退出传播下去。
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
/*
* 这里加全局锁的一个很重要的目的是使这个方法串行化执行。
* 否则在线程池关闭阶段,退出的线程会通过tryTerminate调用到此方法,
* 并发尝试中断还未中断的线程,引发一场中断风暴。
*/
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 工作线程在处理任务阶段是被互斥锁保护的,从而这里不会中断到。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 最多中断一个。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

3.3.3 shutdown

shutdown方法关闭线程池是有序优雅的,线程池进入SHUTDOWN状态后不会接受新任务,但是任务队列中已有的任务会继续处理。

shutdown方法会中断所有未处理任务的空闲线程。

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 状态切换到SHUTDOWN。
advanceRunState(SHUTDOWN);
// 中断所有空闲线程,或者说在任务队列上阻塞的线程。
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试终止线程池(状态流转至TERMINATED)。
tryTerminate();
} private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

3.3.4 shutdownNow

shutdownNow方法关闭线程池相比shutdown就暴力了一点,会中断所有线程,哪怕线程正在执行任务。

线程池进入STOP状态后,不接受新的任务,也不会处理任务队列中已有的任务。

但需要注意的是,即便shutdownNow即便会中断正在执行任务的线程,不代表你的任务一定会挂:如果提交的任务里面的代码没有对线程中断敏感的逻辑的话,线程中断也不会发生什么。

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 状态切换到STOP
advanceRunState(STOP);
// 与SHUTDOWN不同的是,直接中断所有线程。
interruptWorkers();
// 将任务队列中的任务收集到tasks。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池(状态流转至TERMINATED)。
tryTerminate();
return tasks;
} /**
* 此方法只会被shutdownNow方法调用,用于中断所有工作线程。
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
} void interruptIfStarted() {
Thread t;
/*
* 这里中断已经执行过初次unlock的工作线程(参考runWorker方法逻辑),
* 因为如果还没有走到初次unlcok那一步的工作线程,一定会读到线程池状态至少为STOP从而退出。
*/
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
} /**
* 将任务队列中的任务dump出来。
* 这个方法执行完以后,任务队列其实可能还会有残留的任务。
* 比方说:我们的任务队列用LinkedBlockingQueue,事件顺序如下:
* 时刻1: 线程池状态为RUNNGING,线程A执行ThreadPoolExecutor#execute方法的
* if (isRunning(c) && workQueue.offer(command))
* isRunning(c)返回true,此时还未执行offer操作。
* 时刻2: 线程B执行shutdownNow,切换线程池状态到STOP,接下来执行完drainQueue方法。
* 时刻3: 线程A开始执行offer操作,往任务队列中添加了任务。
*
* 对于这种情况,确实drainQueue没有按照doc描述返回所有未执行的任务,
* 但实际上在ThreadPoolExecutor#execute方法中,向任务队列中添加完任务后有个再次检查线程池状态的步骤。
* 此时线程A一定能够读取到线程池状态已经不是RUNNING了,在将任务从队列中移除后会使用饱和策略处理任务。
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

3.4 饱和策略

当线程池由于状态或者参数配置等原因无法执行任务时,会通过reject方法调用内置的RejectedExecutionHandler(Java并发编程实战将其译为饱和策略)处理任务。

ThreadPoolExecutor中内置四种饱和策略,并且可以通过setRejectedExecutionHandler来动态调整。

下面简单介绍一下四种饱和策略:

  • CallerRunsPolicy

    调用线程池提交任务的线程自己运行提交的任务,前提是线程池仍然处于RUNNING状态,否则任务会被静默丢弃。
  • AbortPolicy

    抛出RejectedExecutionException异常,这是线程池默认的饱和策略。
  • DiscardPolicy

    静默丢弃任务。
  • DiscardOldestPolicy

    丢弃任务队列中首部的任务,重新执行任务。

四种默认饱和策略的实现都比较简单,就不对代码作介绍了。

4. 思考与总结

线程池的大致套路读懂并不是很难,包括代码中方法、语句的作用都不难读懂。难点在于读懂整体的设计精华、每一行代码为什么这么写。

下面自问自答一些读源码过程中的思考与总结。

4.1 mainLock的作用

线程池中用于保存工作线程的是一个HashSet,还有一些统计的字段比如largestPoolSize用于统计线程池中出现过的最大线程数,completedTaskCount用于统计完成的任务数。

这些东西的更新与读取都会被mainLock保护。这里很容易有个问题,为什么不用并发容器来保存工作线程?Doug Lea在源码的doc里的描述大意是:用锁可以串行化interruptIdleWorkers方法,避免关闭线程池时大量线程并发中断其他线程。另外在shutdown/shutdownNow时由于需要遍历工作线程集合来检查权限,在检查完权限后会中断工作线程。加上锁也可以保证在检查权限与中断线程过程中,工作线程集合元素不变。

4.2 Worker为什么要实现Mutex锁

Worker类继承AQS实现了一个简单不可重入的互斥锁,在执行用户提交任务的开始时需要获取锁,任务结束时需要释放锁。锁在这里最主要的目的是为了保证被别的线程中断时处于空闲状态,即没有在执行任务。当然如果shutdownNow方法被调用时,所有的线程都会被中断不管是否处于空闲状态。

很自然会想到为什么不能复用ReentrantLock组合在里面呢?实际上这里不能用ReentrantLock,因为不能允许工作线程能够多次获取锁。

我通过翻阅Doug Lea的代码库历史,发现当时有个ThreadPoolExecutor的bug,主要的问题就在于用户提交的任务通过调用ThreadPoolExecutor#setCorePoolSize -> interruptIdleWorkers 会把任务本身对应的工作线程给中断掉,因为工作线程可以通过tryLock方法重入了锁,这是不应该出现的预期外的结果。

Doug Lea的对应修复

ThreadPoolExecutor源码解读

把Worker类改成了继承AQS,实现简单的Mutex锁。

4.3 TIDYING状态的意义

ThreadPoolExecutor很早以前是只有四种状态而没有TIDYING的。我个人对此状态存在意义的思考是,TIDYING的加入使得ThreadPoolExecutor的状态跃迁逻辑更为平滑。

Doug Lea在某次提交中加入了这个状态。

TIDYING相当于很早以前的TERMINATED,目前ThreadPoolExecutor中TIDYING和TERMINATED之间的流转在于是否完成了terminated钩子方法的调用。

5. 参考资料

  • 《Java并发编程实战》
  • 《Java并发编程的艺术》