一 异步用new Thread? 大写的"low"!!
new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start();
你还在像上面这么用吗,太low 了。弊端多多:
1. 每次new Thread新建对象性能差。
2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
3. 缺乏更多功能,如定时执行、定期执行、线程中断。
相比new Thread,Java提供的四种线程池的好处与此相对,在于:
1. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
2. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
3. 提供定时执行、定期执行、单线程、并发数控制等功能。
二 底层java.util.concurrent.ThreadPoolExecutor
无论创建哪种线程池 必须要调用ThreadPoolExecutor, ????列举一个构造方法
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
拒绝策略:
三 几种典型线程池的源码分析
Java通过Executors提供四种线程池,分别为(juc.Executors包下):
newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程(60s不执行任务),若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来串行执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
1. FixedThreadPool
- FixedThreadPool的corePoolSize和maxiumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
- FixedThreadPool使用了*队列LinkedBlockingQueue作为线程池的工作队列,由于是*的,当线程池的线程数达到corePoolSize后,新任务将在*队列中等待,因此线程池的线程数量不会超过corePoolSize,同时maxiumPoolSize也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。
-
FixedThreadPool运行图如下
执行过程如下:
1.如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
3.线程执行完1中的任务后会从队列中取任务。
注意LinkedBlockingQueue是*队列,所以可以一直添加新任务到线程池。
2. SingleThreadExecutor
SingleThreadExecutor是使用单个worker线程的Executor。特点是使用单个工作线程执行任务。它的构造源码如下:
执行过程如下:
1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。
2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
3.线程执行完1中的任务后会从队列中取任务。
注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。
3. CachedThreadPool
CachedThreadPool是一个”无限“容量的线程池,它会根据需要创建新线程。特点是可以根据需要来创建新的线程执行任务,没有特定的corePool。下面是它的构造方法:
1.首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行。execute()方法执行成功,否则执行步骤2
2.当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务。
3.在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。
SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。
4. 手动调用new ThreadPoolExecutor
四 引申的几个问题
1. 无限大CachedThreadPool的OOM
ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 1; i <= 2100; i++) { executorService.submit(() -> { try { Thread.sleep(5000); } catch (Exception e) { //ignore } }); }
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.balfish.hotel.train.concurrent.countDownLatch.CountDownLatchMain.main(CountDownLatchMain.java:31)
可以看出来是堆外内存溢出,因为我们新建的线程都在工作(代码中用sleep模拟在工作中),newCachedThreadPool 只会重用空闲并且可用的线程,所以上述代码只能不停地创建新线程,
在 64-bit JDK 1.7 中 -Xss 默认是 1024k,也就是 1M,那就是需要 2100*1M = 2.1G 的堆外内存空间来给线程使用,机器内存分配不够创建新的线程,所以就 OOM 了。
newCachedThreadPool最大值初始化时默认为Integer.MAX_VALUE,一般来说机器都没那么大内存给它不断使用。那么我们一般去重写一个方法限制一下这个最大值,或者看下newFixedThreadPool是否满足
2. ArrayBlockingQueue 和 LinkedBlockingQueue 的区别
(1)一把锁 vs 分离锁
ArrayBlockingQueue中的锁是没有分离的,即生产者和消费者用的是一个锁
LinkedBlockingQueue的锁是分离的,即生产用的是putLock,消费用的是takeLock
(2)数组 vs 链表
ArrayBlockingQueue基于数组,生产和消费的时候,直接将枚举对象插入或移除,不会有额外的对象实例的空间开销
LinkedBlockingQueue基于链表,生产和消费的时候,需要把枚举转换为Node<E>进行插入或移除,有额外的Node对象开销。这在大批量并发处理数据时,对GC有一定影响
(3)队列长度
ArrayBlockingQueue是有界的,必须指定队列大小
LinkedBlockingQueue是*的,可以不指定队列的大小,默认是Integer.MAX_VALUE。(也可以指定队列大小,从而成为有界的)
(4)队列效率
ArrayBlockingQueue快。LinkedBlockingQueue用默认大小且生产速度大于消费速度时候,可能会OOM
3. shutdown和shutdownNow
可以调用线程池的shutdown或者shutdownNow方法来关闭线程池。他们的原理是遍历线程池的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法停止。
区别:
shutdown方法将执行平缓的关闭过程:不再接收新的任务,同时等待已提交的任务执行完成, 包括那些还未开始执行的任务。
shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true,当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
4. 继承覆写ThreadPoolExecutor
可以通过继承线程池来自定义线程池,重写线程池的beforeExecute, afterExecute和terminated方法。在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或者统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。如果任务在完成后带有一个Error,那么就不会调用afterExecute。如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者finalize统计等操作