线程池工厂Executors编程的艺术

时间:2021-12-22 17:23:53

  Executors是一个线程池的工厂类,提供各种有用的线程池的创建,使用得当,将会使我们并发编程变得简单!今天就来聊聊这个工厂类的艺术吧!

  Executors只是Executor框架的主要成员组件之一,为java的异步任务调度执行提供了重要的入口!

  在说Executors之前,还需要说一下另一个Executor框架的重要成员,ThreadPoolExecutor。

  ThreadPoolExecutor 实现了ExecutorService接口,提供了线程池的调度功能!成为纯种池技术的基石!

ThreadPoolExecutor 的主要构造函数:

    /**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

  咱们此处不是要去分析其源码,只是想看一下怎样使用他!

  从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个,在其注释上也有说明功能,咱们翻译下每个参数的功能:

    corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该size
maximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限
keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位
workQueue: 阻塞队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费;
ArrayBlockingQueue: 构造函数一定要传大小
LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。
PriorityBlockingQueue: 优先队列
threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用
handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略
AbortPolicy: 直接抛弃(默认)
CallerRunsPolicy: 用调用者的线程执行任务
DiscardOldestPolicy: 抛弃队列中最久的任务
DiscardPolicy: 抛弃当前任务

  所以,虽然参数有点多,但是其实仔细阅读以上的注释,基本就能很好地使用线程池了,不过咱们还可以通过一个流程图来说明这一切,对于喜欢看图说话的同学来说是件好事!

线程池工厂Executors编程的艺术

  好了,ThreadPoolExecutor 介绍完后,什么感觉呢? 说是简单,其实还是挺麻烦的,毕竟你还要去了解各种队列的公优缺点,去了解线程池范围怎样设置合理。所以,是时候让 Executors 工厂出场了!

  作为一个工厂类,Executors 简化了各种参数,只忘文生义即可明白其意思!Executors 主要提供4种类型的线程池!

1. FixedThreadPool 创建一个指定数量的线程池

  可控制线程最大并发数,超出的线程会在队列中等待。

  其实现方式如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

  使用固定线程数的worker和*队列保存多余的task,所以创建的线程数不会超过corePoolSize,所以maxPoolSize是一个无效参数,所以keepAliveTime是一个无效参数,所以不会调用RejectedExecutionHandler.rejectedExecution()方法,即无拒绝策略可用;从而在外部表现来看,就是固定线程,在执行无限的队列!

2. SingleThreadExecutor 创建一个单线程化的线程池

  它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  其实现方式如下:

    public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

  与 FixedThreadPool 原理类似,不过这里只会使用一个线程在运行。使用一个线程可以保证取任务时的顺序一致性,从而表现出先到先得的效果!在要求有执行顺序要求的场景,刚好派上用场!

3. CachedThreadPool, 创建一个可缓存的无限线程池

  如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

  其实现方式如下:

    public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

  使用SyncronousQueue作为线程池的工作队列,这是特殊的队列,它是个没容量的阻塞队列(至于为什么要用这个特性,据说是性能比 LinkedBlockingQueue 更高,详解他),每个插入操作必须有一个线程对应的移除操作,反之一样;当提交一个cached任务时,执行SyncronousQueue.offer(x),如果有等待的空闲线程,则匹配成功,exec立即返回;如果没有匹配,则会创建一个新线程执行任务;任务执行完后使用poll()等待keep即60秒,一直尝试从队列中获取任务,超时后自动释放线程;

  使用这种线程池把握好节奏,因为看起来线程池动不动就会创建系列线程池,而且动不动就会释放线程,完全是不可控的,不可监控的。

4. ScheduledThreadPool 创建一个定长线程池

  支持定时及周期性任务执行。

  其实现如下:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

  看起来,这个在参数方面没有太多的解说,只是依赖于 ScheduledThreadPoolExecutor 的实现了!所以,我们来看一下 ScheduledThreadPoolExecutor是怎么实现的?

    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor    {}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

  ok, 如上两个,就够了!ScheduledThreadPoolExecutor 也是继承了 ThreadPoolExecutor, 其构造方法也是依赖于父类,只是将 队列实现改为 DelayedWorkQueue,从而实现延时运行或者定期执行的效果!

  而相比于 Timer 来说,ScheduledThreadPool会有更好的执行效果,因为Timer只会有一个线程来执行任务,会有局限性。而且 ScheduledThreadPool 提供其他很多的可操作方法!

  Executors 的出现,使线程池的使用难度大大降低,使开发更方便,在合适的场合使用相应的工厂方法,定能让开发事半功倍!