《java.util.concurrent 包源码阅读》12 线程池系列之ThreadPoolExecutor 第二部分

时间:2022-11-09 16:19:30

接着说worker线程是如何工作的。ThreadPoolExecutor有一个成员类叫Worker,所起到的作用就是线程池worker线程的作用。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

这里AbstractQueuedSynchronizer的作用是使Worker具有锁的功能,在执行任务时,会把Worker锁住,这个时候就无法中断Worker。Worker空闲时候是线程池可以通过获取锁,改变Worker的某些状态,在此期间因为锁被占用,Worker就是不会执行任务的。

Worker工作的逻辑在ThreadPoolExecutor#runWorker方法中

        public void run() {
runWorker(this);
}

因此转到runWorker方法:

    final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
// 执行分配的任务或者从BlockingQueue中等待获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
// 执行任务之前的工作
beforeExecute(w.thread, task);
Throwable thrown = null;
// 执行任务,如果发生异常,该Worker就不会再继续执行任务
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行完的工作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Worker不再执行任务的处理,completedAbruptly为false
// 表示正常结束,否则表示执行任务出错。
processWorkerExit(w, completedAbruptly);
}
}

来看看processWorkerExit,重点看看执行任务发生异常时该如何处理

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 发生异常,首先要更新Worker数量
if (completedAbruptly)
decrementWorkerCount(); // 移除这个Worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
} // 尝试停止线程池,正常运行的线程池调用该方法不会有任何动作
tryTerminate(); int c = ctl.get();
// 如果线程池没有被关闭的话,
if (runStateLessThan(c, STOP)) {
// Worker不是异常退出,检查worker线程数是不是小于最小值
// 这个最小值分为几种情况:
// 1. allowCoreThreadTimeOut(JDK6新加)表示是否允许线程池在超
// 过一定时间没有收到任务后退出,这种情况下,最小值为0,因为如果如
// 果一直没有任何任务,worker线程数是0
// 2. 最小值为corePoolSize,因为corePoolSize可能为0,因此这种情况
// 下,如果有任务的话必然会有Worker,因此最小值为1
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 如果Worker线程数小于最小值,新建一个Worker线程
addWorker(null, false);
}
}

这篇文章主要讲述了Worker线程的工作原理,接下里会讲线程池是如何进行状态切换的。