受限于硬件、内存和性能,我们不可能无限制的创建任意数量的线程,每一台机器允许的最大线程是一个有界值。因此ThreadPoolExecutor管理的线程数量是有界的。线程池就是用这些有限个数的线程,去执行提交的任务。但是对于多用户、高并发的应用来说,提交的任务数量非常巨大,会比允许的最大线程数多很多。为了解决这个问题,必须要引入排队机制,或者是在内存中,或者是在硬盘等容量很大的存储介质中。Java的ThreadPoolExecutor只支持任务在内存中排队,通过BlockingQueue暂存还没有来得及执行的任务。
线程的管理是比较复杂的,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。
corePoolSize:
线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTime
和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。
maximumPoolSize:
线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize()改变运行的最大线程的数目。
poolSize:
线程池中当前线程的数量,当该值为0的时候,意味着没有任何线程,线程池会终止;同一时刻,poolSize不会超过maximumPoolSize。
现在我们通过ThreadPoolExecutor.execute()方法,看一下这3个属性的关系,以及线程池如何处理新提交的任务。以下源码基于JDK1.6.0_37版本。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
新提交一个任务时的处理流程很明显:
- 如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;
- 如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);
- 如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务; 4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。
接下来我们看下allowCoreThreadTimeOut和keepAliveTime属性的含义。在压力很大的情况下,线程池中的所有线程都在处理新提交的任务或者是在排队的任务,这个时候线程池处在忙碌状态。如果压力很小,那么可能很多线程池都处在空闲状态,这个时候为了节省系统资源,回收这些没有用的空闲线程,就必须提供一些超时机制,这也是线程池大小调节策略的一部分。通过corePoolSize和maximumPoolSize,控制如何新增线程;通过allowCoreThreadTimeOut和keepAliveTime,控制如何销毁线程。
allowCoreThreadTimeOut:
该属性用来控制是否允许核心线程超时退出。If false,core threads stay alive even when idle.If true, core threads use keepAliveTime to time out waiting for work。如果线程池的大小已经达到了corePoolSize,不管有没有任务需要执行,线程池都会保证这些核心线程处于存活状态。可以知道:该属性只是用来控制核心线程的。
keepAliveTime:
如果一个线程处在空闲状态的时间超过了该属性值,就会因为超时而退出。举个例子,如果线程池的核心大小corePoolSize=5,而当前大小poolSize =8,那么超出核心大小的线程,会按照keepAliveTime的值判断是否会超时退出。如果线程池的核心大小corePoolSize=5,而当前大小poolSize =5,那么线程池中所有线程都是核心线程,这个时候线程是否会退出,取决于allowCoreThreadTimeOut。
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
(poolSize > corePoolSize || allowCoreThreadTimeOut)这个条件,就是用来判断是否允许当前线程退出。workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);就是借助阻塞队列,让空闲线程等待keepAliveTime时间之后,恢复执行。这样空闲线程会由于超时而退出。
合理的配置线程池的大小,从以下几个角度分析任务的特性:
任务的性质:CPU密集型任务、IO密集型任务、混合型任务。
任务的优先级:高、中、低。
任务的执行时间:长、中、短。
任务的依赖性:是否依赖其他系统资源,如数据库连接等。
性质不同的任务可以交给不同规模的线程池执行,
CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,
IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1,
混合型的任务,如果可以拆分,拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。
若任务对其他系统资源有依赖,如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。 合理设置线程池值大小,需要结合系统实际情况大量的尝试和比较去得出合理的设置大小。
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU
可以得出一个结论: 线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。 以上公式与CPU和IO密集型任务设置线程数基本吻合。
高并发、任务执行时间短的业务怎样使用线程池?并发不高、任务执行时间长的业务怎样使用线程池?并发高、业务执行时间长的业务怎样使用线程池?
(1)高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
(2)并发不高、任务执行时间长的业务要区分开看:
1)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
2)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样吧,线程池中的线程数设置得少一些,减少线程上下文的切换
(3)并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考(2)。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件对任务进行拆分和解耦。