Java并发编程札记-(六)JUC线程池-02ThreadPoolExecutor实现原理

时间:2022-05-09 16:42:53

本文通过学习ThreadPoolExecutor源码来学习线程池的实现原理。

简介

为什么要使用线程池
许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请求,就创建一个线程来处理,表面看是没有问题的,但实际上存在着很严重的缺陷。服务器应用程序中经常出现的情况是请求处理的任务很简单但客户端的数目却是庞大的,这种情况下如果还是每收到一个请求就创建一个线程来处理它,服务器在创建和销毁线程所花费的时间和资源可能比处理客户端请求处理的任务花费的时间和资源更多。为了缓解服务器压力,需要解决频繁创建和销毁线程的问题。线程池可以实现这个需求。

什么是线程池
线程池可以看做是许多线程的集合。在没有任务时线程处于空闲状态,当请求到来,线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务。这样就实现了线程的重用。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

工作模型
Java并发编程札记-(六)JUC线程池-02ThreadPoolExecutor实现原理

工作模型中一共有三种队列:正在执行的任务队列,等待被执行的阻塞队列,等待被commit进阻塞队列中的任务队列。

Java中的线程池
Java中常用的线程池有三个,最出名的当然是ThreadPoolExecutor,除此之外还有ScheduledThreadPoolExecutor、ForkJoinPool。本文主要学习ThreadPoolExecutor的实现原理。

创建ThreadPoolExecutor线程池

强烈推荐使用Executors工厂方法创建线程池,如Executors.newCachedThreadPool()(*线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。本文主要学习如何手动配置,下面是ThreadPoolExecutor的一个构造方法。

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor一共有四个构造方法,其他三个构造方法都是通过上述的构造方法来实现的。毫无疑问手动配置线程池的关键就是学好构造方法中的几个参数如何设置。这几个参数对应着ThreadPoolExecutor中的几个成员属性。

属性

corePoolSize与maximumPoolSize分别是核心池大小与最大池大小。在源码中的声明为
private volatile int corePoolSize;private volatile int maximumPoolSize;

当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的*值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

workQueue是线程池工作模型中的阻塞队列,用于传输和保持提交的任务。在源码中的声明为private final BlockingQueue<Runnable> workQueue;

keepAliveTime是池中线程空闲时的活动时间。如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见 getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0, allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

threadFactory是一个线程集合。线程池可以使用ThreadFactory创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

handler是线程池拒绝策略,RejectedExecutionHandler类型的对象。当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下, execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:

  • ThreadPoolExecutor.AbortPolicy ,默认策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy,不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

除了上面的几个属性外,ThreadPoolExecutor还有下面的几个参数。

//ctl是一个AtomicInteger类型的原子对象。ctl记录了线程池中的任务数量和线程池状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程池的锁
private final ReentrantLock mainLock = new ReentrantLock();
//所有工作的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
//支持的等待condition
private final Condition termination = mainLock.newCondition();
//线程池中线程数量曾经达到过的最大值
private int largestPoolSize;
//已完成任务数量
private long completedTaskCount;
//是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;

排队策略

排队有三种通用策略:
SynchronousQueue
它将任务直接传输给工作队列workers,而不保持任务。如果不存在空闲线程,则会新建一个线程来执行任务。比如,在Executors.newCachedThreadPool()方法中使用的就是此策略。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

LinkedBlockingQueue
*队列,使用此队列会导致在所有corePoolSize线程都忙时新任务在队列中等待。这样,创建的线程就不会超过corePoolSize。比如,在Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()方法中使用的就是此策略。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

ArrayBlockingQueue
有界队列,没见到在哪里用到了这种策略。

线程池状态

源码已经告诉了我们线程池有几个状态。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

可以看出,一共有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。ctl对象一共32位,高3位保存线程池状态信息,后29位保存线程池容量信息。线程池的初始化状态是RUNNING,在源码中体现为private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

状态 高三位 工作队列workers中的任务 阻塞队列workQueue中的任务 未添加的任务
RUNNING 111 继续处理 继续处理 添加
SHUTDOWN 000 继续处理 继续处理 不添加
STOP 001 尝试中断 不处理 不添加
TIDYING 010 处理完了 如果由SHUTDOWN - TIDYING ,那就是处理完了;如果由STOP - TIDYING ,那就是不处理 不添加
TERMINATED 011 同TIDYING 同TIDYING 同TIDYING

各个状态的转换图如下所示
Java并发编程札记-(六)JUC线程池-02ThreadPoolExecutor实现原理

执行任务

execute(Runnable)

/*
* 在将来某个时间执行给定任务。
* 可以在新线程中或者在现有池线程中执行该任务。
* 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,
* 则该任务由当前 RejectedExecutionHandler 处理。
*/

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//分三种情况处理
//case1:如果线程池中运行的线程数量<corePoolSize
if (workerCountOf(c) < corePoolSize) {
//创建新线程来处理请求,即使其他辅助线程是空闲的
if (addWorker(command, true))
return;
c = ctl.get();
}
//case2:如果线程池中运行的线程数量>=corePoolSize,且线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池的状态,如果线程池不是RUNNING状态,且成功从阻塞队列中删除任务
if (! isRunning(recheck) && remove(command))
//该任务由当前 RejectedExecutionHandler 处理
reject(command);
//如果线程池中运行的线程数量为0
else if (workerCountOf(recheck) == 0)
//则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
addWorker(null, false);
}
//case3:如果以上两种case不成立,即没能将任务成功放入阻塞队列中,且addWoker新建线程失败
else if (!addWorker(command, false))
//该任务由当前 RejectedExecutionHandler 处理
reject(command);
}

看完后,我们知道execute()分三种情况处理任务

case1:如果线程池中运行的线程数量<corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
case2:如果线程池中运行的线程数量>=corePoolSize,且线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,就再次检查线程池的状态,1.如果线程池不是RUNNING状态,且成功从阻塞队列中删除任务,则该任务由当前 RejectedExecutionHandler 处理。2.否则如果线程池中运行的线程数量为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
case3:如果以上两种case不成立,即没能将任务成功放入阻塞队列中,且addWoker新建线程失败,则该任务由当前 RejectedExecutionHandler 处理。

submit

/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
* 该 Future 的 get 方法在 成功 完成时将会返回 null。
*/

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

可以看到此方法是通过调用execute(Runnable)实现的。

关闭线程池

ThreadPoolExecutor提供了shutdown()和shutdownNow()两个方法来关闭线程池。shutdown() 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。shutdownNow()尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。

shutdown()

/*
* 按过去执行已提交任务的顺序发起一个有序的关闭,不接受新任务。
* 如果已经关闭,则调用没有其他作用。
*/

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//step1.获取独占锁
mainLock.lock();
try {
//step2.如果有安全管理器,使用安全管理器检查当前线程是否有权限关闭线程池
checkShutdownAccess();
//step3.将线程池状态设为SHUTDOWN
advanceRunState(SHUTDOWN);
//step4.中断所有空闲线程
interruptIdleWorkers();
//step5.钩子函数,没有执行任何操作
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//step6.释放锁
mainLock.unlock();
}
//step7.将线程池状态设置为TERMINATED
tryTerminate();
}

将shutdown()方法总结如下
step1.获取独占锁
step2.如果有安全管理器,使用安全管理器检查当前线程是否有权限关闭线程池,如果没有权限则抛出SecurityException。
step3.将线程池状态设为SHUTDOWN。一旦将线程池状态设为SHUTDOWN,就不能像线程池中添加新任务了。
step4.中断所有空闲线程
step5.钩子函数,没有执行任何操作
step6.释放锁
step7.将线程池状态设置为TERMINATED

shutdownNow()

/*
* 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
* /
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//step1.获取独占锁
mainLock.lock();
try {
//step2.如果有安全管理器,使用安全管理器检查当前线程是否有权限关闭线程池
checkShutdownAccess();
//step3.尝试停止所有的活动执行任务
advanceRunState(STOP);
//step4.暂停等待任务的处理
interruptWorkers();
//step5.获取等待执行的任务列表
tasks = drainQueue();
} finally {
//step6.释放锁
mainLock.unlock();
}
//step7.将线程池状态设置为TERMINATED
tryTerminate();
//step8.返回等待执行的任务列表
return tasks;
}

将shutdownNow()方法总结如下
step1.获取独占锁
step2.如果有安全管理器,使用安全管理器检查当前线程是否有权限关闭线程池,如果没有权限则抛出SecurityException。
step3.尝试停止所有的活动执行任务
step4.暂停等待任务的处理
step5.获取等待执行的任务列表
step6.释放锁
step7.将线程池状态设置为TERMINATED
step8.返回等待执行的任务列表

shutdown()和shutdownNow()的区别

  • 调用shutdown()后,线程池状态立刻变为SHUTDOWN,而调用shutdownNow(),线程池状态立刻变为STOP。
  • shutdown()通过中断空闲线程、不接受新任务的方式按过去执行已提交任务的顺序发起一个有序的关闭,shutdownNow()无差别地停止所有的活动执行任务,暂停等待任务的处理。也就是说,shutdown()等待任务执行完才中断线程,而shutdownNow()不等任务执行完就中断了线程。

本文就先讲到这里,想了解Java并发编程更多内容请参考: