Java并发编程总结5——ThreadPoolExecutor

时间:2022-12-02 12:04:34

一、ThreadPoolExecutor介绍

在jdk1.8中,构造函数有4个。以

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)为例:

1、corePoolSize: 核心线程池大小

2、maximumPoolSize: 最大线程池大小

3、keepAliveTime: 当线程池中的线程数大于corePoolSize时, 多余空闲线程等待新任务的最长时间, 超过这个时间后多余线程终止

4、unit: 时间单位, 比如毫秒, 纳秒等

5、workQueue: 阻塞队列

6、threadFactory: 创建线程工厂, 可以方便的创建线程, 可以自定义; 默认为Executors.DefaultThreadFactory

7、handler: 饱和策略, 默认为AbortPolicy

定义一个完整的线程池可以这么写:

private ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(10, 100, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

线程池的主要处理流程如下:

Java并发编程总结5——ThreadPoolExecutor

1、提交任务到线程池,判断核心线程池(corePoolSize)是否已满? 没满,创建工作线程执行任务;满了,进入下个流程。

2、线程池判断工作队列(workQueue)是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

3、判断整个线程池(maximumPoolSize)是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略(RejectedExecutionHandler)来处理这个任务。

二、饱和策略 RejectedExecutionHandler

在ThreadPoolExecutor类中定义了4种RejectedExecutionHandler类型:

1、AbortPolicy: 默认策略,直接抛出RejectedExecutionException异常。

2、CallerRunsPolicy:只用调用者所在线程来运行任务。

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

4、DiscardPolicy:不处理,丢弃掉。

也可以自定义饱和策略,比如将无法处理的新任务加入日志等,只需要实现RejectedExecutionHandler接口即可。

三、阻塞队列BlockingQueue

    /**
* ArrayBlockingQueue: 由数组结构组成的有界阻塞队列, 队列遵循FIFO
*/
private BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10); /**
* LinkedBlockingQueue: 由链表结构组成的阻塞队列, FIFO
* LinkedBlockingDeque: 由链表结构组成的双向阻塞队列, 构造方法和LinkedBlockingQueue类似
* public LinkedBlockingQueue(int capacity) //有界阻塞队列,队列最大值为capacity
* public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //*阻塞队列,队列最大值为Integer.MAX_VALUE
* 通过Executors.newFixedThreadPool(int nThreads)创建的线程池中采用的是*LinkedBlockingQueue
* 对于put和take操作, 内部采用了不同的锁: putLock, takeLock, 而ArrayBlockingQueue内部只有一把锁
*/
private BlockingQueue<Thread> lbq = new LinkedBlockingQueue<Thread>(10);
private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
private BlockingQueue<String> lbd = new LinkedBlockingDeque<String>(); /**
* PriorityBlockingQueue: 支持优先级排序的有界阻塞队列
* public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
*/
private BlockingQueue<String> pbq = new PriorityBlockingQueue(100, new Comparator<String>() {
public int compare(String o1, String o2) {
return o1.compareTo(o2); //升序排列
}
}); /**
* DelayQueue: 一个使用优先级队列实现的*阻塞队列, 支持延时获取元素, 适用于缓存系统的设计以及定时任务调度。
* 内部队列采用PriorityQueue, private final PriorityQueue<E> q = new PriorityQueue<E>();
* 队列元素需要实现Delayed接口, class DelayQueue<E extends Delayed> extends AbstractQueue<E>
*/ /**
* SynchronousQueue: 不存储元素的阻塞队列
*/ /**
* LinkedTransferQueue: 由链表结构组成的实现了TransferQueue接口的*阻塞队列
* transfer方法: 如果当前消费者正在等待接收元素(take()或poll(long timeout, TimeUnit unit))方法, 生产者传入的元素可以直接传递给消费者, 而不放入队列
*/

BlockingQueue<String> ltq = new LinkedTransferQueue<String>();

四、通过Executors创建线程池

       /**
* 创建单个线程, 适用于需要保证顺序执行任务的场景
* return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
* BlockingQueue采用*LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义
*/

Executors.newSingleThreadExecutor();
/**
* 创建固定线程, 适用于需要限制当前线程数量, 负载比较重的服务器
* return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
* 采用*LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义
*/

Executors.newFixedThreadPool(10);
/**
* 根据需要创建线程, 适用于执行很多的短期异步任务的小程序, 或者负载较轻的服务器
* return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
* 初始线程为0, 采用SynchronousQueue阻塞队列, 如果生产任务的速度低于消费的速度, 空闲60s的线程会被终止; 如果生产任务的速度持续高于消费速度, 则会不断创建新线程
*/

Executors.newCachedThreadPool();
/**
* 在给定的延迟之后运行任务, 或者定期执行任务
* super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
* 采用*DelayQueue队列, 因此maximumPoolSize、keepAliveTime参数无意义
*/

Executors.newScheduledThreadPool(10);