Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

时间:2022-12-02 18:36:14

一、介绍

首先我们来看一下Executors工具类中创建线程池的几个方法。

ExecutorService newFixedThreadPool(int nThreads):固定大小线程池。

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用*queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表明什么呢?就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是*的

ExecutorService newSingleThreadExecutor():单线程。可以看到,与fixedThreadPool很像,只不过fixedThreadPool中的入参直接退化为1。

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别


ExecutorService newCachedThreadPool():*线程池,可以进行自动线程回收。

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

这个实现就有意思了。首先是*的线程池,所以我们可以发现maximumPoolSize为big big。

其次BlockingQueue的选择上使用SynchronousQueue。这个任务阻塞队列是直接提交方式。如何理解呢?当主线程提交任务到线程池的时候,

由于此时没有空闲线程等在SynchronousQueue队列的出口取任务,那么在底层被提交的任务执行offer(e)的操作会返回false,

所以ThreadPoolExecutor会创建一个线程来承接当前这个被提交的任务。

以上是一些简单的分析,下面请继续。

二、分析

先从BlockingQueue<Runnable> workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。以下为引用:(我会稍微修改一下,并用红色突出显示)

所有 BlockingQueue 都可用于传输和保持提交的任务,除了SynchronousQueue。因为SynchronousQueue不能保持提交的任务。

可以使用此队列与池大小进行交互:

  • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会添加到queue中,而是直接抄家伙(thread)开始运行
  • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列而不添加新的线程
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
而任务的阻塞队列通常有以下三种:1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。也就是说,如果提交一个任务A到池中,此时如果没有空闲线程等在SynchronousQueue队列的出口取任务,则会立马构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求* maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许*线程具有增长的可能性。
2. *队列。使用*队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用*队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许*线程具有增长的可能性。3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。 

三、案例分析

例子一:使用直接提交策略,也即SynchronousQueue。
首先SynchronousQueue是*的,也就是说他存储任务的能力是没有限制的,但是由于该Queue本身的特性在某次试图提交任务必须有可用线程取出。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。
我们使用一下参数构造ThreadPoolExecutor:
new ThreadPoolExecutor(2, 3,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
当核心线程已经有2个正在运行.
1. 此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”。而现在我们使用的是SynchronousQueue队列,SynchronousQueue不保存任务,需要立即新建线程执行任务(A)。并且此时已经达到了最大值maximumPoolSize。2. 又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的是SynchronousQueue,并且此时线程数目超出maximumPoolSize,在这种情况下,任务将被拒绝。

另外我发现当使用SynchronousQueue的时候,发现设置的corePoolSize没有用,也就是说就算设置corePoolSize不为0,当任务结束后,该线程还是会被销毁(通过调用ThreadPoolExecutor.getPoolSize()查看当前线程池线程数)

所以在使用SynchronousQueue通常要求maximumPoolSize是*的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1没有被执行前,A2不可能获得执行机会。
例子二:使用*队列策略,即LinkedBlockingQueue这个就拿newFixedThreadPool来说,根据前文提到的规则:
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。那么当任务继续增加,会发生什么呢?如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于*队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了。

例子三:有界队列,使用ArrayBlockingQueue。

这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。

四、BlockingQueue向队列尾部添加元素的三种方法区别

添加元素的方法有三个:add,put,offer。下面我们来看一下这三个方法的区别:

add方法

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

简单来说就是:add方法在添加元素到队列尾部的时候,若没有超过队列长度的限制就返回true,否则就抛出IllegalStateException。

put方法

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

简单来说就是:若向队列尾部添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素。

offer方法

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

简单来说就是:offer方法在添加元素到队列尾部的时候,如果发现队列已满无法添加的话,会直接返回false。

五、BlockingQueue移除队列头部元素的三种方法区别

remove

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

简单来说就是:remove方法从队列头部移除元素的时候,若队列为空,抛出NoSuchElementException异常。

take

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

简单来说就是:take方法从队列头部移除元素的时候,若队列为空,发生阻塞,等待有元素。

poll

Java Executor并发框架(十二)Executor框架线程池BlockingQueue的三种实现区别

简单来说就是:poll方法从队列头部移除元素的时候,若队列为空,返回null。

六、总结

  1. ThreadPoolExecutor的使用还是很有技巧的。
  2. 使用*queue可能会耗尽系统资源。
  3. 使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小
  4. 线程数自然也有开销,所以需要根据不同应用进行调节。
通常来说对于静态任务可以归为:
  1. 数量大,但是执行时间很短
  2. 数量小,但是执行时间较长
  3. 数量又大执行时间又长
  4. 除了以上特点外,任务间还有些内在关系


参考文献:

1.ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别