ThreadPoolExecutor源码分析

时间:2022-12-28 18:01:02

ThreadPoolExecutor是Java自带线程池FixedThreadPool(固定大小)、 SingleThreadExecutor(单线程)、CacheThreadPool (无限大)的具体实现。我们也可以继承此类来实现自己的线程池。

其内部主要实现是通过队列保存需要执行的任务,并通过coreSize和maxSize控制线程池内线程的个数。

ThreadPoolExecutor的关键属性

ctl是存储runState(运行状态)和workerCount(工作线程数量)的一个AtomicInteger。

其中runState存储在ctl(共32位)的高3位,workerCount存储在低29位。

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//初始为0个workerCount
private static final int COUNT_BITS = Integer.SIZE - 3;//29位
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//前3位为0,后29位为1 private static final int RUNNING = -1 << COUNT_BITS;//运行
private static final int SHUTDOWN = 0 << COUNT_BITS;//不接受新任务,继续接受队列任务,线程池内任务继续执行
private static final int STOP = 1 << COUNT_BITS;//不接受新任务,不接受队列任务,且线程池内任务停止
private static final int TIDYING = 2 << COUNT_BITS;//全部任务已经停止,运行终止代码
private static final int TERMINATED = 3 << COUNT_BITS;//终止代码运行完毕 private static int runStateOf(int c) { return c & ~CAPACITY; }//根据ctl获取runState
private static int workerCountOf(int c) { return c & CAPACITY; }//根据ctl获取workerCount
private static int ctlOf(int rs, int wc) { return rs | wc; }//根据runState、workerCount获取ctl

ctl

ThreadPoolExecutor运行流程

光是看属性比较难理解,我们可以模拟平常使用线程池的方法,看看其内部是怎么运行的。

 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
} public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
} public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
} public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造

FixedThreadPool传入的coreSize ==  maxSize,线程存活时间为无限,工作队列为有缓存(放入时未达到队列缓存值则返回)的*阻塞队列。

CachedThreadPool传入的coreSize = 0, maxSize为无限大,线程存活时间为60秒,工作队列为无缓存(放入时必须要等待取出后才能返回、即缓存值为1)的阻塞队列。

一般我们调用excute(Runnable r)的方法来使用线程池。

 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//当前工作线程小于coreSize
if (addWorker(command, true))//新增一个工作线程,并把其第一个任务设为传入的command
return;//成功直接返回
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//工作线程达到coreSize,将任务放入队列中,等待工作线程获取
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//再次确认是否运行中,若无则移除该线程
reject(command);//拒绝
else if (workerCountOf(recheck) == 0)//workerCount在上面语句执行途中变为0
addWorker(null, false);//增加worker
}
else if (!addWorker(command, false))//添加失败,可能线程池关闭或饱和
reject(command);//拒绝
} private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 是否运行中
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; for (;;) {
int wc = workerCountOf(c);//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))//工作线程数量已达最大值
return false;//返回增加失败
if (compareAndIncrementWorkerCount(c))//尝试CAS增加workerCount
break retry;//成功返回
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//可能终止
continue retry;
//增加失败,原因可能为workerCount被其他线程改变,重试内部循环
}
} boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//将该任务作为首任务
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());//在有锁的情况下重新检查runState if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //t可用
throw new IllegalThreadStateException();
workers.add(w);//放入集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//largestPoolSize供外部调用,仅在有锁情况下返回
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//开启工作线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

excute

除了我在代码中另加的注释,jdk官方本身也有对excute执行顺序的解释,我尝试翻译如下:

1.如果小于corePoolSize的线程正在运行,尝试启动新的线程并把传入的command作为其第一个任务。addWorker方法检查runState和workerCount,通过返回false防止错误添加线程。
2.如果任务可以被成功入队,我们依然需要去双重检测在进入方法后,是否已经增加了一个线程(因为存在可能在第一次检测后一个线程结束了)或者线程池结束了。所以我们再次检测state,并当发现若线程池停止了,我们会撤回入队,或者当发现没有线程在执行了,会开启一个新线程。
3.如果我们不能把一个任务成功入队,我们就尝试增加一个新的线程,如果增加线程也失败了,我们就得知线程被终止或者饱和了,拒绝该任务。

任务执行的核心代码显然是在内部类Worker的run方法中,我们去看看。

 public void run() {
runWorker(this);//调用外部的runWorker并把当前的worker传进去
} final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 解除锁定以允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//第一个任务为null,或者从队列取任务时返回null
w.lock();
//如果线程池正在停止,确保线程中断,否则确保线程不中断。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//当前线程应当中断
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//处理回收的worker
}
}

run

其中getTask中允许线程的复用,我们去看一下。

 private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 检测线程池状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;//返回null由worker自己结束
} int wc = workerCountOf(c); // workerCount大于corePoolSize时候、为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //大于maximum或者上次获取已经超时
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;//返回null由worker自己结束
continue;//cas失败,继续尝试获取task
} try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://大于core,在keepAliveTime时间内尝试从队列获取任务
workQueue.take();//小于或等于,阻塞获取任务
if (r != null)
return r;
timedOut = true;//r==null,超时,进入下次循环
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

getTask

主要就是判断了超时时间,CachedThreadPool空闲线程的回收也就是通过这个来判断的。

而FixedThreadPool的线程则一直在队列获取时阻塞。

另外还有个processWorkerExit比较简单,主要是从worker集合中删除,这里就不再说明了。