并发包的线程池第一篇--ThreadPoolExecutor执行逻辑

时间:2022-01-07 04:27:01

学习这个很长时间了一直没有去做个总结,现在大致总结一下并发包的线程池。

首先,任何代码都是解决问题的,线程池解决什么问题?

如果我们不用线程池,每次需要跑一个线程的时候自己new一个,会导致几个问题:

1,不好统一管理线程和它们的相互之间的依赖关系,尤其是有的程序要做的事情很多的时候,线程的处理就显得很杂乱,更雪上加霜的是,线程本身就是不可预期的,不是说先跑的线程就一直在后跑的线程前面,一旦形成复杂的依赖关系,也就会形成复杂的状态(由所有线程的状态共同决定)。

2,效率低下,有可能你的每次跑的线程(thread.start())就做一点点小工作,很快做完很快消亡。可是跑一个线程的系统上下文切换的开销是很大的(相对来说)。当需要大量线程的时候这个问题会很突出。

那么线程池为什么可以解决以上两个问题呢?对于第一个问题,毫无疑问对线程进行了统一管理。第二个问题,注意线程池的底层实现不是new一个线程然后start,而是有一个名字叫worker的线程,它首先持有要跑的runnable的对象的引用,然后在它的run()方法里面直接调用这个对象的run方法。换句话说,你放进去的线程并没有真正注册为一个线程跑起来。那么,通过控制worker的数量和运行模式,就可以节约很多的开销。(worker线程不一定会因为你的runnable跑完而被销毁,会接着去跑别的线程,实现了线程的复用)

接下来要分为两个方向讲这个问题,第一个是ThreadPoolExecutor的执行模式。第二个是来自Executors的给你装配好的执行模式。

首先分析线程池的继承结构。

顶层是一个接口(删除了代码注释):

 public interface Executor {

     void execute(Runnable command);

 }

很显然,任何线程池(也可以理解为线程执行器)的最核心的功能就是执行线程。我个人感觉这个地方其实隐隐约约使用了命令模式(虽然是runnable,但是底层并不是start(),而是run(),我觉得可以理解为把一系列操作封装成了一个command)。

接下来还是一个接口:

 public interface ExecutorService extends Executor {

     void shutdown();

     List<Runnable> shutdownNow();

     boolean isShutdown();

     boolean isTerminated();

     boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

可以看到这个更进一步的接口更多地约束了线程池的行为,能够关闭,能够递交Future。很多人喜欢吧ThreadPoolExecutor向上转型为ExecutorService来使用。

接下来是一个抽象类:

 public abstract class AbstractExecutorService implements ExecutorService {
//代码省略
}

里面封装了一些大致差不多的代码。注意这个地方使用了模板方法模式(比如很多地方的代码实现还是依赖于execute()方法,而这个抽象类本身没有实现这个方法)

所以目前我们可以认为ThreadPoolExecutor最重要的方法是execute(),事实上这个方法贯穿了整个线程池。

接下来就是重头戏:ThreadPoolExecutor的具体实现

当然这个类的代码很多,但是我们只抓最关键(执行整个流程)的execute()方法。

     public void execute(Runnable command) {
if (command == null) //防御性编程,检查输入的参数是否为null
throw new NullPointerException(); int c = ctl.get(); //这个ctl是状态变量,表示了目前线程池的状态
if (workerCountOf(c) < corePoolSize) { //如果当前worker数量低于核心池大小
if (addWorker(command, true)) //直接新建worker并且让他run这个线程
return;
c = ctl.get(); //运行到这里说明前面的addWorker失败,也许线程池状态变化
} if (isRunning(c) && workQueue.offer(command)) { //接下来首先看线程池状态,是不是在跑。如果是,再看能不能添加到阻塞队列里面去
int recheck = ctl.get(); //再次检查线程池状态
if (! isRunning(recheck) && remove(command)) //如果发现线程池状态变化(比如终止)则尝试从阻塞队列中移除这个runnable
reject(command); //发生线程被拒绝时指向的方法(由构造参数决定) //最后一种情况,发现目前是线程池关闭了,但是阻塞队列还有线程,这个时候新建工人去完成阻塞队列里面没有完成的工作(所以其直接任务参数为null)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false)) //第三种情况,如果阻塞队列都满了(线程太多),那还是去新建工人完成任务吧
reject(command);
}

以上是我打注释的源码。在这里还是有好几个点要说。

首先第一个是源码大量是用了一种小技巧。如:

 if (isRunning(c) && workQueue.offer(command))

在if里面通过条件判断运行函数(比如上面这句话先判断isRunning(c),如果true再执行后面的,不是就不执行了,同时如果后面的运行成功,则进入if语句)。这么写可以让程序本身很精简。

然后,要谈的是这个的整体的运行逻辑,不妨举个例子。某个工厂要招工人完成订单。有两个重要的参数:1,长工的数目(假设为10),2,总工人的最大数目(假设为20)。

一开始,没有工人,所以要招长工。现在开始,每有一个订单,就去招一个工人当长工。直到长工数为10。接下来,老板考虑订单虽然多了,可是再招工人成本就太贵了,于是把新来的订单放在流水线上暂存起来,叫目前的工人做完手上的活之后赶紧去流水线上继续完成别的订单。接下来订单继续增长,老板发现现在的情况是长工已经完不成积压的订单。这怎么办呢?没办法还是要工人,要不然违约的代价更大。继续开始招短工。这些短工一般是带着任务进厂,也就是客户一来订单就立马招个短工完成这个订单从而保证任务不积压。接下来客户的订单突然少了,以至于没有订单了。老板发现很多工人无事可做,在他观察一段时间后,开始裁员,当然是先裁短工了,因为一开始的打算就是招你们进去渡过这个火爆期的,现在任务完成了,你们就该走了。于是工厂的工人数又回到了长工数。一般情况下的老板不会把长工一起裁掉,即使他们不干活,也就让他们睡觉。可是还是有很黑心的老板直接把长工都裁掉了。

以上的故事中,工厂就是线程池,工人就是worker线程,订单就是调用execute()传进来的runnable参数,长工数就是corePoolSize,总工数是maximumPoolSize,流水线就是阻塞队列,老板观察工人不干活的时间是keepAliveTime,至于老板裁不裁长工是由allowCoreThreadTimeOut参数控制的。

这就是整个线程池的运行逻辑。而工人的运行逻辑是:手上有任务就先完成手上的任务(工人的构造函数传进来的runnable),如果没有或者已经完成了就去阻塞队列找任务做,当然如果没有任务就阻塞自己。

接下来研究一下addWorker的运行逻辑。

 private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); //获得当前的线程池状态
int rs = runStateOf(c); //这个函数就是进行一下检查 if (rs >= SHUTDOWN && //线程池已经关闭,并且不是派进来*的工人就直接返回false,添加工人失败。
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; for (;;) {
int wc = workerCountOf(c); //当前工人数
if (wc >= CAPACITY || //当前工人太多而添加失败(长工不可超过corePoolSize,短工不可超过maximumPoolSize)
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) //CAS操作增加c,如果失败则继续循环
break retry; //运行到这里就说明发现是确实需要招工人,并且CAS操作成功
c = ctl.get(); //CAS失败,重新获得当前状态
if (runStateOf(c) != rs) //这个时候发现不只是CAS失败,而是状态就发生了变化(比如线程池关闭)
continue retry; //那么重头再来
}
} //现在确认要添加工人了
boolean workerStarted = false; //工人还没有开始工作
boolean workerAdded = false; //工人还没有真正添加进去
Worker w = null; //目前工人还是一个null
try {
final ReentrantLock mainLock = this.mainLock; //获得这个线程池的锁
w = new Worker(firstTask); //新建一个工人,同时告诉他第一个任务是什么,注意这个地方内部很重要
final Thread t = w.thread; //这个w.therad,是把工人这个runnable包装成一个thread,也就是工人线程自己
if (t != null) { //成功获得工人线程(注意此时工人线程已经持有了firstTask这个参数,能够运行它)
mainLock.lock(); //拿锁,注意下满的操作和目前线程池状态有关,如果不加锁,有可能发生读到错误的状态。
try {
int c = ctl.get(); //得到目前状态
int rs = runStateOf(c); //检查一下 if (rs < SHUTDOWN || //保证现在线程池没有被关闭,或者不是清理worker
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //t线程当然不能是个在跑的线程
throw new IllegalThreadStateException();
workers.add(w); //储存了所有的worker的一个hashset容器添加这个worker
int s = workers.size(); //目前一共有多少worker
if (s > largestPoolSize) //超过最大值
largestPoolSize = s; //记录最大值
workerAdded = true; //修改标志,告诉别人已经添加worker了
}
} finally {
mainLock.unlock(); //最后一定要放锁
}
if (workerAdded) { //检查是否成功添加worker
t.start(); //添加成功就让新加入的worker开始运行吧
workerStarted = true; //修改worker是否开始运行的标志位
}
}
} finally {
if (! workerStarted) //失败的处理
addWorkerFailed(w);
}
return workerStarted; //返回值告诉调用者是否成功
}

这就是添加一个工人的代码和我写的注释。基本已经写的很清楚了

接下来解释一下worker的逻辑,也就是以上代码32行33行发生的事。以下代码有删减

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

     final Thread thread; //这个worker要跑的线程,注意底层是这个worker调用这个线程的run方法

     Runnable firstTask;  //接受传进来的参数

     Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask; //外面来了task
this.thread = getThreadFactory().newThread(this); //通过threadFactory把这个runnable包装成一个thread
} public void run() {
runWorker(this); //注意这个时候这个worker的thread字段已经有实际意义,所以调用这个方法的时候能正确run这个thread
} }

好了,原来是这样,先告诉worker要干什么,然后用适配器模式直接把runnable适配成一个thread(threadFactory做这个)

最后记得上面写过,在addWorker的时候会调用这个start()。

那么worker的run方法又是怎么实现的呢?

这个run方法其实调用了runWorker(this),实际上这个runWorker是个不典型的模板方法模式的应用。因为这个方法在类ThreadPoolExecutor里面。一般的模板方法模式是子类实现不同的执行细节,然后调用父类的方法(父类通过虚函数使用子类的实现)。这个地方子类倒是没有实现什么方法,可是把自己当参数传给了父类的方法,而且调用了父类的final方法,当时实际上这个参数也包含了重要信息(到底跑什么runnable)。所以个人觉得可以说是模板方法模式的一个变形。

runWorker方法:

 final void runWorker(Worker w) {  //注意传进来的worker包含了重要信息
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //这个地方允许被打断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //getTask不停去阻塞队列拿线程任务
w.lock(); //拿锁
// 如果线程池已经停止了,允许这个线程被打断
// 否者保证这个线程不被打断
// 需要一个第二次检查来应对shutdownnow被调用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //这个交给子类来实现,这里什么都没有,如果自己写的线程需要,则重写这个方法
Throwable thrown = null;
try {
task.run(); //最最最底层的调用,就是把worker的task拿出来运行,下一次则从阻塞队列中拿
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //同beforeExecute
}
} finally {
task = null;
w.completedTasks++; //这个worker完成的任务加一
w.unlock(); //释放锁
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

以上就是ThreadPoolExecutor执行线程的全部核心过程。