在Java中的线程即是工作单元也是执行机制,从JDK5以后,工作单元与执行机制解耦。工作单元(任务)包括了Runnable和Callable,而执行机制由JDK5中增加的java.util.concurrent包中Executor框架提供。Executor框架主要包含了3个方面:
任务:与Java线程的任务相同,Executor框架接收实现了Runnable与Callable接口的任务。
执行线程池:在Executor框架下有两个核心接口,Executor接口与继承了Executor接口的ExecutorService接口,而ThreadPoolExecutor线程池正是这两个接口的实现类。此外Executor框架下还有AbstractExecutorService,ScheduledExecutorService,ScheduledThreadPoolExecutor的实现,这些实现类实现了Executor框架下不同的并发任务执行机制。Executor框架的类结构及类功能批注如下图所示:
异步计算任务的支持:ExecutorService接口提供了Future的支持,此外Executor框架针对异步计算,提供了Future的实现类FutrueTask类。FutureTask是一个可取消的异步计算类(FutrueTask.cancel()),除了提供了Futrue类的基本实现外,还提供了查看计算任务是否执行状态,以及等待返回计算结果的方法FutrueTask.get()。
执行流程
在Executor框架下,使用者需要创建Runnable或者Callable类型的任务对象,然后通过ExecutorService.execute()方法将任务提交给ExecutorService的实现类来执行任务,其中ExecutorService.submit()函数将会返回实现了Future接口的异步计算结果对象。Executor的执行框架图如下:
我们可以看到Executor的执行流程就是一个生产者消费者模型,主线程负责生产任务,然后将任务提交给消费者,ExecutorService负责对任务的执行,进行消费,而生产者与消费者之间任务的存储正是ThreadPoolExecutor线程池的任务队列实现的。从而Executor框架实现了任务与执行的解耦。
详谈Executors类
对于Executor框架的使用,java并发包下封装了Executors工厂模板类,提供了一些常用的模板线程池的创建方法。在JDK8中Executors基于ThreadPoolExecutor封装了3种模板线程池:FixedThreadPool、SingleThreadPool和CachedThreadPool。三种线程池都是直接使用ThreadPoolExecutor创建的,只是设置了不同的线程数以及任务队列,从而使得线程池具有不同的运行特性,如下是Executor创建固定线程池的方法实现(JDK8):
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
我们可以看到FixedThreadPool的实现,正是直接使用ThreadPoolExecutor来创建的固定线程池,三种线程池对应的特性与默认参数如下表所示:
线程池 | 特性 | 任务队列(BlockingQueue) | 核心线程数 | 最大线程数 | 线程保活时间 |
---|---|---|---|---|---|
FixedThreadPool | 创建固定线程数线程池 | LinkedBlockingQueue() | nThread | 无效 | 0L |
SingleThreadPool | 单线程任务执行线程池 | LinkedBlockingQueue() | 1 | 1 | 0 |
CachedThreadPool | 动态分配线程数 | SynchronousQueue() | 0 | Integer.MAX_VALUE | 60s |
FixedThreadPool:该线程池使用了固定线程数来创建线程池,其最大线程数等于核心线程数的原因在于使用了*队列LinkedBlockingQueue,永远不会被填满(除非虚拟机抛出内存异常OOM),因此最大线程数只要设置不小于核心线程数即可,这也意味着FixedThreadPool不会拒绝任务的提交。KeepAliveTime表示线程池中空闲线程等待任务的最长时间,而FixedThreadPool将KeepAliveTime设置为0,意味着只要运行线程无法从队列中取到任务,便会立即结束本线程,从而线程池中不会存在空闲线程。FixedThreadPool确保了线程池的最大线程创建数量,一般应用于需要限制线程数的应用场景,但需要注意,*队列的使用,可能会造成大量的任务存储在任务队列中,JVM虚拟机的退出,会使得这些任务的丢失,因此在使用时需要考虑到线程池异常关闭的已提交的任务处理机制。
SingleThreadPool:该线程池只创建了一个线程进行任务的执行,工作线程被创建后会不断的从任务队列中取任务,由于其KeepAliveTime也被设置为0,因此当任务队列为空时,该线程同样会退出,也就是说与FixedThreadPool相同,工作线程一旦创建就不会是空闲线程。同时SingleThreadPool使用了具有FIFO特性的*队列LinkedBlockingQueue作为线程池任务队列,说明SingleThreadPool线程池可以保证任务的执行顺序与任务的添加顺序是相同的,由于只有一个线程会取任务,所有该线程池可以保证任务执行的顺序性。
CachedThreadPool:该线程池使用了一个不存储元素的阻塞队列SynchronousQueue作为任务队列,SynchronousQueue队列是一个没有容量的队列,那么结合线程池最大线程数与核心线程数的关系,我们可以这样理解,在线程池中没有空闲线程从SynchronousQueue中取任务时,向任务队列中提交任务的方法就会失败,此时ThreadPoolExecutor会采用处理任务队列已满的情况来创建新的工作线程去处理新的任务。这就意味着,如果向线程池提交的都是长任务,CachedThreadPool有可能会一直创建新的线程来执行新的任务,所以CachedThreadPool适合于小任务的并发应用场景。此外CachedThreadPool允许空闲线程存活60s,相对于其他两种线程池,CachedThreadPool的线程复用率更高。
这三种模板线程池的创建都反回了一个ExecutorService类型对象,ExecutorService为使用者提供了统一的线程池管理接口。这三种线程池体现出了ThreadPoolExecutor是整个Executor框架中的重要性,ThreadPoolExecutor提供了一个全面的线程池创建管理机制与接口,使用者只需根据实际需要的线程池模型需求,便可方便的创建出适合的线程池,这三个模板线程池可以作为我们理解使用ThreadPoolExecutor的例子。下面代码使用ThreadPoolExecutor创建了一个只允许最多1000个任务进行等待,最大不超过60个线程,并且不允许空闲线程存在的自定义线程池:
public static ExecutorService newSelfDefineThreadPool() { return new ThreadPoolExecutor(60, 60, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000)); //设置队列容量 }
此外,Executors工厂类也提供了创建定时任务执行的两种模板线程池:多线程ScheduleThreadPool和单线程SingleScheduleThreadPool,这两种线程池都依赖于ScheduleThreadPoolExecutor来创建计划任务线程池。
定时任务执行线程池
ScheduleThreadPoolExecutor是ThreadPoolExecutor的一个子类,用于执行需要延时执行,或者周期执行的任务,提供了如下4个接口用于添加定时任务。当然,ScheduleThreadPoolExecutor保留了executor()与submit()方法,其提交的任务会被立即执行,不会等待。
提交一个固定时间间隔执行的任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
提交一个固定延时间隔执行的任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
提交一个延时Runnable任务(执行一次)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
提交一个延时Callable任务(执行一次)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
相对于直接使用线程池,ScheduleThreadPoolExecutor在针对执行的任务有执行时机要求的场景下使用很方便,无需额外地添加时间控制代码,以往那些需要通过Thread.sleep()将线程睡眠来控制任务执行周期的代码都可以被ScheduleThreadPoolExecutor简化了。
在ScheduleThreadPoolExecutor的实现中,封装了DelayedWorkQueue作为线程池的任务队列,该队列是一个优先队列,优先队列具有take()方法获取到的元素永远是最高优先级的特性(优先级最高的元素永远排在队列的第一个元素),其控制了ScheduleThreadPoolExecutor从任务队列取到的任务永远是当前最应该执行的任务。优先队列是一个不完全排序的队列,其只保证优先级最高的元素在第一位,并不保证整个队列全部有序,这对于ScheduleThreadPoolExecutor来说完全够用了,ScheduleThreadPoolExecutor只要确保任意时刻取到的任务都是当前队列里优先级最高的即可。有同学可能会问,直接使用一个有序的队列,不也能保证任务的优先级执行顺序?当然可以,但是效率太低,因为每次队列的元素添加操作,都会触发一次所有元素的全排序,频繁的全排序会非常影响性能,而优先队列实质是一个二叉树数据结构,其算法特性使得优先队列只需很少的元素比较与交换(树节点上浮下潜操作),便可将优先级最高的元素交换到队列首部,从而确保了ScheduleThreadPoolExecutor获取到的元素是最高优先级任务的同时,也极大的提高了队列的吞吐性能,因此在不稳定的队列环境下优先队列性能远比全排序队列性能好,优先队列的算法不属于本文讨论重点,有兴趣的同学,推荐看《算法(第4版)》2.4节,来深刻理解优先队列的特性。在DelayedWorkQueue中ScheduleThreadPoolExecutor将定时任务封装成了ScheduledFutureTask类型的任务类,ScheduledFutureTask包含了任务执行的时间time变量,优先队列会根据time进行优先级排序,如果时间相同,那么优先队列会根据ScheduledFutureTask的任务添加顺序字段sequenceNumber进行排序。ScheduleThreadPoolExecutor的执行流程图如下图所示:
我们注意到对于周期执行的ScheduledFutureTask会在任务执行完成后,重新计算下次任务执行的时间,然后加入优先队列等待执行。
总结
通过本文的分析,不得不再次佩服并发大师Doug Lea。在Executor框架下的所有特色线程池以及延时任务线程池的实现,都只是对于ThreadPoolExecutor的直接使用,或者是继承封装。不同线程数以及不同模型的任务队列都会生成一个具有特定特性的线程池,此外ThreadPoolExecutor还提供了线程工厂的设置,这给程序员提供了控制线程创建的机会。