「java.util.concurrent并发包」之 ThreadPoolExecutor

时间:2021-02-21 17:37:12

一 异步用new Thread? 大写的"low"!!

new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
}).start();

你还在像上面这么用吗,太low 了。弊端多多:

1.  每次new Thread新建对象性能差。

2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。

3. 缺乏更多功能,如定时执行、定期执行、线程中断。

 

相比new Thread,Java提供的四种线程池的好处与此相对,在于:
1. 重用存在的线程,减少对象创建、消亡的开销,性能佳。

2. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。

3. 提供定时执行、定期执行、单线程、并发数控制等功能。

 

 

二 底层java.util.concurrent.ThreadPoolExecutor

无论创建哪种线程池 必须要调用ThreadPoolExecutor, ????列举一个构造方法 

「java.util.concurrent并发包」之 ThreadPoolExecutor


当一个任务通过execute(Runnable)方法欲添加到线程池时: 

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 
如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 

也就是:处理任务的优先级为: 
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 

拒绝策略:

「java.util.concurrent并发包」之 ThreadPoolExecutor

 

三 几种典型线程池的源码分析

Java通过Executors提供四种线程池,分别为(juc.Executors包下):

newCachedThreadPool      创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程(60s不执行任务),若无可回收,则新建线程。
newFixedThreadPool         创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 
newScheduledThreadPool  创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor  创建一个单线程化的线程池,它只会用唯一的工作线程来串行执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

 1. FixedThreadPool

「java.util.concurrent并发包」之 ThreadPoolExecutor

  • FixedThreadPool的corePoolSize和maxiumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
  • FixedThreadPool使用了*队列LinkedBlockingQueue作为线程池的工作队列,由于是*的,当线程池的线程数达到corePoolSize后,新任务将在*队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maxiumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。
  • FixedThreadPool运行图如下

    「java.util.concurrent并发包」之 ThreadPoolExecutor

    执行过程如下:

    1.如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。

    2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

    3.线程执行完1中的任务后会从队列中取任务。

    注意LinkedBlockingQueue是*队列,所以可以一直添加新任务到线程池。

 

2. SingleThreadExecutor  

SingleThreadExecutor是使用单个worker线程的Executor。特点是使用单个工作线程执行任务。它的构造源码如下:

「java.util.concurrent并发包」之 ThreadPoolExecutor

SingleThreadExecutor的corePoolSize和maxiumPoolSize都被设置1。
其他参数均与FixedThreadPool相同,其运行图如下:

 「java.util.concurrent并发包」之 ThreadPoolExecutor

执行过程如下:

1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。

2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

3.线程执行完1中的任务后会从队列中取任务。

注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。

 

 3. CachedThreadPool

 CachedThreadPool是一个”无限“容量的线程池,它会根据需要创建新线程。特点是可以根据需要来创建新的线程执行任务,没有特定的corePool。下面是它的构造方法:

「java.util.concurrent并发包」之 ThreadPoolExecutor

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximum是*的。这里keepAliveTime设置为60秒,意味着空闲的线程最多可以等待任务60秒,否则将被回收。
 
CachedThreadPool使用没有容量的SynchronousQueue作为主线程池的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。其运行图如下:
「java.util.concurrent并发包」之 ThreadPoolExecutor

 

执行过程如下:

1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。execute()方法执行成功,否则执行步骤2

2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务。

3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。

SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。

 

 

 4. 手动调用new ThreadPoolExecutor

「java.util.concurrent并发包」之 ThreadPoolExecutor

 

 

 

四 引申的几个问题 

1. 无限大CachedThreadPool的OOM

     ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i <= 2100; i++) {
            executorService.submit(() -> {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    //ignore
                }
            });
        }
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:717)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.balfish.hotel.train.concurrent.countDownLatch.CountDownLatchMain.main(CountDownLatchMain.java:31)

可以看出来是堆外内存溢出,因为我们新建的线程都在工作(代码中用sleep模拟在工作中),newCachedThreadPool 只会重用空闲并且可用的线程,所以上述代码只能不停地创建新线程,

在 64-bit JDK 1.7 中 -Xss 默认是 1024k,也就是 1M,那就是需要 2100*1M = 2.1G 的堆外内存空间来给线程使用,机器内存分配不够创建新的线程,所以就 OOM 了。

newCachedThreadPool最大值初始化时默认为Integer.MAX_VALUE,一般来说机器都没那么大内存给它不断使用。那么我们一般去重写一个方法限制一下这个最大值,或者看下newFixedThreadPool是否满足

 

2. ArrayBlockingQueue 和 LinkedBlockingQueue 的区别

(1)一把锁 vs 分离锁

ArrayBlockingQueue中的锁是没有分离的,即生产者和消费者用的是一个锁

LinkedBlockingQueue的锁是分离的,即生产用的是putLock,消费用的是takeLock

(2)数组 vs 链表

ArrayBlockingQueue基于数组,生产和消费的时候,直接将枚举对象插入或移除,不会有额外的对象实例的空间开销

LinkedBlockingQueue基于链表,生产和消费的时候,需要把枚举转换为Node<E>进行插入或移除,有额外的Node对象开销。这在大批量并发处理数据时,对GC有一定影响

(3)队列长度

ArrayBlockingQueue是有界的,必须指定队列大小

LinkedBlockingQueue是*的,可以不指定队列的大小,默认是Integer.MAX_VALUE。(也可以指定队列大小,从而成为有界的)

(4)队列效率

ArrayBlockingQueue快。LinkedBlockingQueue用默认大小且生产速度大于消费速度时候,可能会OOM

 

3. shutdown和shutdownNow

可以调用线程池的shutdown或者shutdownNow方法来关闭线程池。他们的原理是遍历线程池的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法停止

区别:

shutdown方法将执行平缓的关闭过程:不再接收新的任务,同时等待已提交的任务执行完成, 包括那些还未开始执行的任务。

shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true,当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

 

4. 继承覆写ThreadPoolExecutor

可以通过继承线程池来自定义线程池,重写线程池的beforeExecute, afterExecute和terminated方法。在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或者统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。如果任务在完成后带有一个Error,那么就不会调用afterExecute。如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者finalize统计等操作