最近阅读了JDK线程池ThreadPoolExecutor
的源码,对线程池执行任务的流程有了大体了解,实际上这个流程也十分通俗易懂,就不再赘述了,别人写的比我好多了。
不过,我倒是对线程池是如何回收工作线程比较感兴趣,所以简单分析了一下,加深对线程池的理解吧。
那么,就以JDK1.8为例分析吧。
1. runWorker(Worker w)
工作线程启动后,就进入runWorker(Worker w)
方法。
里面是一个while循环,循环判断任务是否为空,若不为空,执行任务;若取不到任务,或发生异常,退出循环,执行processWorkerExit(w, completedAbruptly);
在这个方法里把工作线程移除掉。
取任务的来源有两个,一个是firstTask,这个是工作线程第一次跑的时候执行的任务,最多只能执行一次,后面得从getTask()
方法里取任务。看来,getTask()
是关键,在不考虑异常的场景下,返回null,就表示退出循环,结束线程。下一步,就得看看,什么情况下getTask()
会返回null。
篇幅有限,分段截取,省略中间执行任务的步骤
2. getTask() 返回null
一共有两种情况会返回null,见红框处 。
第一种情况,线程池的状态已经是STOP
,TIDYING
, TERMINATED
,或者是SHUTDOWN
且工作队列为空;
第二种情况,工作线程数已经大于最大线程数或当前工作线程已超时,且,还有其他工作线程或任务队列为空。这点比较难理解,总之先记住,后面会用。
下面以条件1和条件2分别指代这两种情况的判断条件。
3. 分场景分析线程池回收工作线程
3.1 未调用shutdown() ,RUNNING状态下全部任务执行完成的场景
这种场景,会将工作线程的数量减少到核心线程数大小(如果本来就没有超过,则不需要回收)。
比如一个线程池,核心线程数为4,最大线程数为8。一开始是4个工作线程,当任务把任务队列塞满,就得将工作线程增加到8. 当后面任务执行到差不多了,线程取不到任务了,就会回收到4个工作线程的状态(取决于allowCoreThreadTimeOut
的值,这里讨论默认值false的情况,即核心线程不会超时。如果为true,工作线程可以全部销毁)。
可以先排除上面提到的条件1,线程池的状态已经是STOP
,TIDYING
, TERMINATED
,或者是SHUTDOWN
且工作队列为空。因为线程池一直是RUNNING
,这条判断永远是false。在这个场景中,可以当条件1不存在。
下面分析取不出任务时线程是怎么运行的。
-
step1. 从任务队列取任务有两种方式,超时等待还是可以一直阻塞下去。决定因素是timed变量。该变量在前面赋值,如果当前线程数大于核心线程数,变量timed为true, 否则为false(上面说了,这里只讨论
allowCoreThreadTimeOut
为false的情况)。很明显,现在讨论的是timed为true的情况。keepAliveTime
一般不设置,默认值为0,所以基本上可以认为是不阻塞,马上返回取任务的结果。
在线程超时等待唤醒之后,发现取不出任务,timeOut变为true,进入下一次循环。
-
step2. 来到条件1的判断,线程池一直
RUNNING
, 不进入代码块。 - step3. 来到条件2的判断,这时任务队列为空,条件成立,CAS减少线程数,若成功,返回null,否则,重复step1。
这里要注意,有可能多条线程同时通过条件2的判断,那会不会减少后线程的数量反而比预想的核心线程数少呢?
比如当前线程数已经只有5条了,此时有两条线程同时唤醒,通过条件2的判断,同时减少数量,那剩下的线程数反而只有3条,和预期不一致。
实际上是不会的。为了防止这种情况,compareAndDecrementWorkerCount(c)
用的是CAS方法,如果CAS失败就continue,进入下一轮循环,重新判断。
像上述例子,其中一条线程会CAS失败,然后重新进入循环,发现工作线程数已经只有4了,timed为false, 这条线程就不会被销毁,可以一直阻塞了(workQueue.take()
)。
这一点我思考了很久才得出答案,一直在想没有加锁的情况下是怎么保证一定能不多不少回收到核心线程数的呢。原来是CAS的奥妙。
从这里也可以看出,虽然有核心线程数,但线程并没有区分是核心还是非核心,并不是先创建的就是核心,超过核心线程数后创建的就是非核心,最终保留哪些线程,完全随机。
3.2 调用shutdown() ,全部任务执行完成的场景
这种场景,无论是核心线程还是非核心线程,所有工作线程都会被销毁。
在调用shutdown()
之后,会向所有的空闲工作线程发送中断信号。
最终传入false,调用下面这个方法。
可以看出,在发出中断信号前,会判断是否已经中断,以及要获得工作线程的独占锁。
发出中断信号的时候,工作线程要么在getTask()
里准备获取任务,要么在执行任务,那就得等它执行完当前任务才会发出,因为工作线程在执行任务的时候,也会工作线程加锁。工作线程执行完任务,又跑到getTask()
里面去了。
所以我们只要看getTask()
里面怎么应对中断异常的就可以了。
工作线程在getTask()
里,有两种可能。
3.2.1 任务已全部完成,线程在阻塞等待。
很简单,中断信号将其唤醒,从而进入下一轮循环。到达条件1处,符合条件,减少工作线程数量,并返回null,由外层结束这条线程。
这里的decrementWorkerCount()
是自旋式的,一定会减1。
3.2.2 任务还没有完全执行完
调用shutdown()
之后,未执行完的任务要执行完毕,池子才能结束。所以此时有可能线程还在工作。
这里又要分两个阶段讨论
阶段1 任务较多,工作线程都能获得任务
这里还不涉及到线程退出,可以跳过不看,只是分析一下收到中断信号后线程的表现。
假设有线程A,正通过getTask()
里获取任务。此时A被中断,在获取任务时,无论是poll()
还是take()
,都会抛出中断异常。异常被捕获,重新进入下一轮循环,只要队列不为空,就可以继续取任务。
线程A被中断,再次取任务,调用workQueue.poll()
or workQueue.take()
,不会抛出异常吗?还可以正常取出任务吗?
这就要看workQueue
的实现了。workQueue
是BlockingQueue
类型,以常见的LinkedBlockingQueue
和ArrayBlockingQueue
为例,加锁时都是调用lockInterruptibly()
,是响应中断的。该方法又调用了AQS的acquireInterruptibly(int arg)
。
acquireInterruptibly(int arg)
,无论是在入口处判断中断异常,还是在parkAndCheckInterrupt()
方法阻塞,被中断唤醒并判断中断异常时,均使用了Thread.interrupted()
。这个方法会返回线程的中断状态,并把中断状态重置!也就是说,线程不再是中断状态了,这样在再次取任务时,就不会报错了。
因此,这对于正在准备取任务的线程,只是相当于浪费了一次循环,这可能是线程中断带来的副作用吧,当然,对整体的运行不影响。
分析到这里,我不禁感叹,这里BlockingQueue
刚好是会重置中断状态,这到底是怎么想出来的绝妙设计啊?Doug Lea大神Orz。
阶段2 任务刚好要执行完了
这时任务已经快取完了,比如有4条工作线程,只剩下2个任务,那就可能出现2条线程获得任务,2条线程阻塞。
因为在获取任务前的判断,没有加锁,那么会不会出现,所有线程都通过了前面的校验,来到workQueue获取任务的地方,刚好任务队列已经空了,线程全部阻塞了呢?因为shutdown()
已经执行完毕,无法再向线程发出中断信号,从而线程一直在阻塞,无法被回收。
这种是不会发生的。
假设有A,B,C,D四条工作线程,同时通过了条件1和条件2的判断,来到取任务的地方。那么,工作队列至少还有一个任务,至少会有一条线程能取到任务。
假设A,B获得了任务,C,D阻塞。
A, B接下来的步骤是:
-
step1. 任务执行完成后,再次
getTask()
,此时符合条件1,返回null,线程准备被回收。 -
step2.
processWorkerExit(Worker w, boolean completedAbruptly)
将线程回收。
回收就只是把线程干掉这么简单吗?来看看processWorkerExit(Worker w, boolean completedAbruptly)
的方法。
可以看到,在里面除了workers.remove(w)
移除线,还调用了tryTerminate()
。
第一个判断条件没有一个子条件符合,跳过。第二个条件,工作线程还存在,那么随机中断一条空闲线程。
那么问题就来了,中断一条空闲线程,也没说是一定中断正在阻塞的线程啊。如果A, B同时退出,有没有可能出现A中断B, B中断A,AB互相中断,从而没有线程去中断唤醒阻塞的线程呢?
答案仍然是,想多了……
假设A能走到这里,说明A已经从工作线程的集合workers里面移除了(processWorkerExit(Worker w, boolean completedAbruptly)
在tryTerminate()
之前,已经将其移除)。那么A中断B,B来到这里中断,就不会在workers里面找到A了。
也就是说,退出的线程不能互相中断,我从集合中退出后,中断了你,你不能中断我,因为我已经退出集合,你只能中断别人。那么,即使有N个线程同时退出,至少在最后,也会有一条线程,会中断剩余的阻塞线程。
就像多米诺骨牌一样,中断信号就会被传播下去。
阻塞的C,D中的任意一条被中断唤醒后,又会重复step1的动作,周而复始,直到所有阻塞线程都被中断,唤醒。
这也是为什么在tryTerminate()
里面,传入false,只需要中断任意一条空闲线程的原因。
想到这里,再次对Doug Lea心生钦敬(粤语)之情。这设计得也太妙了叭。
4. 总结
ThreadPoolExecutor
回收工作线程,一条线程getTask()
返回null,就会被回收。
分两种场景。
1) 未调用shutdown() ,RUNNING状态下全部任务执行完成的场景
线程数量大于corePoolSize
,线程超时阻塞,超时唤醒后CAS减少工作线程数,如果CAS成功,返回null,线程回收。否则进入下一次循环。当工作者线程数量小于等于corePoolSize
,就可以一直阻塞了。
2) 调用shutdown() ,全部任务执行完成的场景
shutdown()
会向所有线程发出中断信号,这时有两种可能。
2.1)所有线程都在阻塞
中断唤醒,进入循环,都符合第一个if判断条件,都返回null,所有线程回收。
2.2)任务还没有完全执行完
至少会有一条线程被回收。在processWorkerExit(Worker w, boolean completedAbruptly)
方法里会调用tryTerminate()
,向任意空闲线程发出中断信号。所有被阻塞的线程,最终都会被一个个唤醒,回收。
这一次的分析,昨晚开始写,写到一半卡壳,今天早上接着写,前后花了大概2+2=4个小时写博客以及1小时思考。
说实话自己还是有点乱,无法一下子理解透彻,也不知道自己理解得对不对。
有没有用,我也不知道,只能说,加深了对线程池的理解吧(安慰自己),同时也感慨设计之精妙。
如有不正确的地方,请大家指正。