Java并发(基础知识)—— Executor框架及线程池

时间:2022-02-13 14:51:07

      在Java并发(基础知识)—— 创建、运行以及停止一个线程中讲解了两种创建线程的方式:直接继承Thread类以及实现Runnable接口并赋给Thread,这两种创建线程的方式在线程比较少的时候是没有问题的,但是当需要创建大量线程时就会出现问题,因为这种使用方法把线程创建语句随意地散落在代码中,无法统一管理线程,我们将无法管理创建线程的数量,而过量的线程创建将直接使系统崩溃。

      从高内聚角度讲,我们应该创建一个统一的创建以及运行接口,为我们管理这些线程,这个统一的创建与运行接口就是JDK 5的Executor框架。

Executor框架                                                                                  

      在Java类库中,任务执行的主要抽象不是Thread,而是Executor,该接口定义如下:

public interface Executor {
    void execute(Runnable command);
}

  虽然Executor是一个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能够支持多种不同类型的任务执行策略,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来。

      Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行工作单元)。如果要在一个程序中实现一个生产者-消费者模式,那么最简单的方式就是使用Executor。

      Executor接口定义了提交任务的方法,但却没有定义关闭的方法,ExecutorService接口扩展了Executor接口,添加了一些用于生命周期管理的方法:

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
  
    ...    
}

  ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有执行中的任务,并且不再启动队列中尚未开始执行的任务。

      在所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。

线程池                                                                                         

      Executor框架的核心是线程池。线程池是指管理一组同构工作线程的资源池,在"线程池中执行任务"比"为每个任务分配一个线程"优势更多。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。通过适当调整线程池大小,可以创建足够多的线程以便使处理器保持忙碌,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存而失败。

      ThreadPoolExecutor定义了一个线程池,该类的声明如下:

public class ThreadPoolExecutor extends AbstractExecutorService { ... }

public abstract class AbstractExecutorService implements ExecutorService { ... }

  可以看到,ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,所以ThreadPoolExecutor也间接实现了ExecutorService接口。

      ThreadPoolExecutor定义了很多构造函数,以下代码给出了该类最重要的构造函数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) { ... }

  corePoolSize、maximumPoolSize、keepAliveTime以及unit这几个参数分别定义了线程池的基本大小、最大大小以及存活时间。corePoolSize定义了线程池的基本大小,也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。maximumPoolSize定义了线程池的最大大小,表示线程池可同时活动线程数量上限。keepAliveTime和unit共同定义了线程的存活时间,如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过基本大小时,这个线程将被终止。

      workQueue参数包含Runnable的阻塞队列,当线程池达到基本大小时,新提交的任务将放入这个阻塞队列中,阻塞队列的实现包含三种:*队列、有界队列以及同步移交队列。

      threadFactory参数用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,方便定位问题。

      handler参数定义了线程池饱和策略。当有界队列被填满后,并且线程池活动线程达到最大线程数,饱和策略开始生效。JDK提供了几种不同的RejectedExecutionHandler实现,分别是AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。AbortPolicy是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。DiscardPolicy策略会把新提交的任务直接抛弃,而DiscardOldestPolicy策略会抛弃队列首部最老的任务。CallerRunsPolicy策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量,它不会在线程池中的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。

Executors静态工厂方法                                                                      

      从上一节内容看出,ThreadPoolExecutor的新建需要传入很多参数,使用起来极不方便。为了便于使用,Executors为我们提供了几个静态工厂方法,大大简化了线程池的创建,它们分别是:

  • newFixedThreadPool:newFixedThreadPool将创建一个固定大小的线程池,每当提交一个任务就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化;
  • newCachedThreadPool:newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,那么将回收空闲线程;而当需求增加时,可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor:newSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程执行任务,如果这个线程异常结束,会创建另一个线程代替。

      以newCachedThreadPool为例,我们可以看看这些静态工厂方法的内部实现:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  可以看到,这些静态工厂方法最终还是调用的ThreadPoolExecutor的构造函数,指定了线程池基本大小为0,最大大小为Integer值上限,线程存活时间为60s,阻塞队列是一个SynchronousQueue。从这些参数可以知道,当线程提交newCachedThreadPool的线程池时,由于基本大小为0,所以肯定大于基本大小,然后任务会进入阻塞队列,而SynchronousQueue内部没有任何容量,且当前线程数未达到最大线程数,所以任务将立即执行。任务执行完有60s的超时时间,如果在这段时间内有新任务调用,那么新任务将直接在这个线程上运行。

总结                                                                                                    

      线程池的使用能够帮助我们统一管理线程,提高线程的可管理性,在写多线程代码时,我们应该优先使用线程池方式创建线程。