Java线程池ThreadPoolExecutor类源码分析

时间:2024-07-08 10:07:50

前面我们在java线程池ThreadPoolExecutor类使用详解中对ThreadPoolExector线程池类的使用进行了详细阐述,这篇文章我们对其具体的源码进行一下分析和总结;

首先我们看下ThreadPoolExecutor用来表示线程池状态的核心变量

//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程个数掩码位数
private static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //(高3位):11100000000000000000000000000000
//接受新任务并且处理阻塞队列里的任务
private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000
//拒绝新任务但是处理阻塞队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000
//拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 //所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000
//终止状态。terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;// // 获取高三位 运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位 线程个数
private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor核心函数

//提交任务函数
void execute(Runnable command)  //执行拒绝策略的函数
void reject(Runnable command) //新增WOKER线程函数
boolean addWorker(Runnable firstTask, boolean core) //WOKER线程执行函数
void runWorker(Worker w) //获取执行任务函数
Runnable getTask() //线程池停止函数
void shutdown() //线程池立即停止函数
void shutdownNow()

接下来我们围绕这几个核心函数对ThreadPoolExector线程池类的执行流程和源码进行分享

一、execute函数

execute 做为ThreadPoolExector的提交任务的函数,注解上已经说明了其实现的主要三步操作:

1、如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程,调用addWorker函数会原子性的检查runState和workCount,通过返回false来防止在不应该添加线程时添加了线程。
2. 如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后该线程死亡了),或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态,若有必要,当停止时还需要回滚入队列操作,或者当线程池没有线程时需要创建一个新线程。
3. 如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shutdown或者已经饱和了,所以拒绝任务
    public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
如果运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象创建一个新的线程,
调用addWorker函数会原子性的检查runState和workCount,
通过返回false来防止在不应该添加线程时添加了线程。 * 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
如果一个任务能够成功入队列,在添加一个线城时仍需要进行双重检查(因为在前一次检查后该线程死亡了),
或者当进入到此方法时,线程池已经shutdown了,所以需要再次检查状态,
若有必要,当停止时还需要回滚入队列操作,
或者当线程池没有线程时需要创建一个新线程。 * 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
如果无法入队列,那么需要增加一个新线程,
如果此操作失败,那么就意味着线程池已经shutdown或者已经饱和了,所以拒绝任务
*/ //获取线程池的运行状态
int c = ctl.get();
//检查核心线程数量
if (workerCountOf(c) < corePoolSize) {
//如果小于corePoolSize 则执行addWorker函数
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断当前线程是否处于Running状态其任务是否可以继续加入workQueue队列(有界还是*任务队列)
if (isRunning(c) && workQueue.offer(command)) {
//如果满足条件,则再次进行状态检查
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果线程池已经不是Running状态,在队列中移除任务,切执行绝交策略
reject(command);
else if (workerCountOf(recheck) == 0)
//如果worker数量是0,则添加worker
addWorker(null, false);
}
else if (!addWorker(command, false))//如果队列已满,则根据maximumPoolSize扩展线程池
//添加worker失败,执行拒绝策略
reject(command);
}
通过上面的代码我们可以看到execute函数本身并不执行提交的Runnable任务 主要作用是根据当然线程池的状态,选择执行addWorker函数还是执行reject拒绝策略

二、reject函数

    final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
handler为RejectedExecutionHandler接口的具体实现,执行相应的拒绝策略
下面代码为各拒绝策略的具体实现
    //如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { } /**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//如果线程池仍在运行状态,执行Runnable的run方法
r.run();
}
}
} //该策略会直接抛出异常,阻止系统正常工作;
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { } /**
* 直接抛出RejectedExecutionException异常
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
} //静默策略,不予任何处理。
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { } /**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
} //该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去,马上要被执行的那个任务,并尝试再次提交;
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { } /**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//直接移除任务队列中第一个任务
e.getQueue().poll();
//再次提交
e.execute(r);
}
}

三、addWorker函数

addworker函数主要完成了两部分功能:

1、通过cas的方式检查线程池状态与当前线程数量,如果符合条件 增加线程个数;

2、如果上一部分cas检查成功,线程数已经加一,那么创建Worker对象并绑定通过线程工厂分配与启动线程;

我们看下具体的代码

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程池状态
int c = ctl.get();
int rs = runStateOf(c); if (rs >= SHUTDOWN && // 线程池状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN
! (rs == SHUTDOWN && //线程池状态等于SHUTDOWN
firstTask == null && //传入的任务为空
! workQueue.isEmpty())) //workQueue队列不为空
return false; for (;;) {
int wc = workerCountOf(c); //获取当前的worker线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//如果当前worker数量大于最大容量或大于设置的最大线程数,返回false
return false;
if (compareAndIncrementWorkerCount(c))//cas的方式增加woker线程数量
//cas增加线程数量成功,跳出循序
break retry;
//cas失败了,则看线程池状态是否变化了
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//如果发生变化则跳到外层循环重新获取线程池状态,否则内层循环重新cas。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
} //到这里代表CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker对象,并通过线程工厂创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//加锁,保证添加worker动作的同步,因为同一时间可能会有多个线程操作
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//重新检查,保证线程状态正常没有关闭
int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
//如果线程池状态异常,抛出异常
throw new IllegalThreadStateException();
//添加worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
} if (workerAdded) {
//添加worker成功,启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
//如果添加失败,做失败处理
addWorkerFailed(w);
}
return workerStarted;
}

通过上面的代码可以看到,线程池新增的线程最终会封装为一个Worker对象,这个对象会通过轮询的方式不断从任务队列中获取任务,并通过其绑定的线程执行,我们看下Worker类的具体内容

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks; /**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
//构造函数,绑定任务并通过线程工厂创建线程
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
} /** Delegates main run loop to outer runWorker */
//run方法执行runWorker函数
public void run() {
runWorker(this);
} // Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state. protected boolean isHeldExclusively() {
return getState() != 0;
} //
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
} protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
} public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
//线程中断
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

在Woker类的构造函数中通过线程工厂创建了线程,当线程start时就开始执行runWorker函数。

四、runWorker函数

runWork函数主要实现的功能就是while轮询方式通过getTask函数获取执行任务,我们来看下具体的代码分析

final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//轮询,通过getTask()获取执行的任务
while (task != null || (task = getTask()) != null) {
w.lock(); //(如果线程池至少是stop状态 或 (线程池至少是stop状态且线程是否处于中断))且wt线程是否处于中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

如果没有获取到task任务的话,执行processWorkerExit函数

private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法 而其中tryTerminate()函数的主要作用就是判断是否有空闲线程,并设置中断;

final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时,直接返回:
* 1. RUNNING,因为还在运行中,不能停止;
* 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
* 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果线程数量不为0,则中断一个空闲的工作线程,并返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated方法默认什么都不做,留给子类实现
terminated();
} finally {
// 设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

getTask函数通过workQueue.take()获取任务时,如果不执行中断会一直阻塞。在下面介绍的shutdown方法中,会中断所有空闲的工作线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。

五、getTask函数

getTask函数主要完成了三块功能:

1、检查线程池及队列任务状态;

2、根据maximumPoolSize、超时时间和队列任务,控制线程数量;

3、满足条件从workQueue队列中获取任务;

private Runnable getTask() {
//超时标志
boolean timedOut = false; // Did the last poll() time out? for (;;) {
//获取线程池状态
int c = ctl.get();
int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//如果线程池状态异常或任务队列为空,返回NULL
decrementWorkerCount();
return null;
}
//获取当前线程数量
int wc = workerCountOf(c); // Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,则返回重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
} try {
//取出任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

六、shutdown函数

shutdown函数是用来停止线程池的,调用shutdown后,线程池就不会在接受新的任务了,但是工作队列里面的任务还是要执行的,但是该方法立刻返回的,并不等待队列任务完成在返回。

    public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加锁
try {
//检查是否允许Shutdown
checkShutdownAccess();
//设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断当前空闲的woker线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

shutdownNow函数

调用shutdownNow函数后,会立即停止线程池,并丢弃和返回任务队列中的任务

    public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否允许Shutdown
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
//中断当前空闲的woker线程
interruptWorkers();
//获取当前队列中的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

下面仔细分析一下:

  1. 在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法;
  2. shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock;
  3. 在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用workQueue.take()进行阻塞;
  4. 如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了workQueue.take()后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;
  5. 由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件;
  6. 解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
  7. 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;
  8. 所以Worker继承自AQS,在工作线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断。

下面就来分析一下interruptIdleWorkers方法。

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断tryLock成功,就中断该线程。

八、总结

本文对线程池的源码进行了基本的分析与总结,大体概括为以下几点:
1、线程池通过CAS的方式记录本身的运行状态和线程池线程个数
2、每个线程都会被封装为一个WOKER线程对象,每个worker线程可以处理多个任务;
3、线程池执行流程中要通过自身的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。

关注微信公众号,查看更多技术文章。

Java线程池ThreadPoolExecutor类源码分析