所属包:
java.util.concurrent.ThreadPoolExecutor
类关系:
public class ThreadPoolExecutor extends AbstractExecutorService
1. 继承关系
ThreadPoolExecutor 继承了一个抽象类:AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService
而这个AbstractExecutorService实现了一个接口:ExecutorService
public interface ExecutorService extends Executor
这个ExecutorService接口又继承了一个类:Executor
public interface Executor
可以看出:
Executor是一个顶层接口,它的子接口ExecutorService继承了它(其实还有一个子接口: ScheduledExecutorService),抽象类AbstractExecutorService实现了这个子接口ExecutorService,最终ThreadPoolExecutor 继承了抽象类AbstractExecutorService并且同时实现了子接口ExecutorService。
2. 构造方法
最简单的一个构造方法:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
有五个参数:
corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut
maximumPoolSize - 池中允许的最大线程数
keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
unit - keepAliveTime参数的时间单位
workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务
实际上它调用了同类的另外一个构造方法,最后两个参数用的默认值
另外一个构造方法:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数:
corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut
maximumPoolSize - 池中允许的最大线程数
keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。
unit - keepAliveTime参数的时间单位
workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。
threadFactory - 执行程序创建新线程时使用的工厂
handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量 (拒绝策略)
3. 如何使用
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class MyThreadPoolExecutor { /**
* 使用有界队列:
* 1、当线程数小于corePoolSize时,创建线程执行任务。
* 2、当线程数大于等于corePoolSize并且workQueue没有满时,放入workQueue中
* 3、线程数大于等于corePoolSize并且当workQueue满时,新任务新建线程运行,线程总数要小于maximumPoolSize
* 4、当线程总数等于maximumPoolSize并且workQueue满了的时候执行handler的rejectedExecution。也就是拒绝策略。
*
* ThreadPoolExecutor默认有四个拒绝策略:
* 1、ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException
* 2、ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法并且阻塞执行
* 3、ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务
* 4、ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务
*
*
*
*/ public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3)); Runnable[] runs = new Runnable[6];
for (int i = 0; i < runs.length; i++) {
runs[i] = new MyTask(i);
} pool.execute(runs[0]); //线程1个,队列0个
pool.execute(runs[1]); //线程1个,队列1个
pool.execute(runs[2]); //线程1个,队列2个
pool.execute(runs[3]); //线程1个,队列3个
pool.execute(runs[4]); //线程2个,队列3个
pool.execute(runs[5]); //线程2个,队列3个,拒绝第六个 pool.shutdown(); }
}
线程:
public class MyTask implements Runnable { private int id; public MyTask(int id) {
this.id = id;
} @Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task-" + id);
} }
这个就是最简单的一个使用方法了。ps:最后那个(线程1个,队列0个....)指的是,你仅仅执行runs[0];runs[0]+runs[1];runs[0]+runs[1]+runs[2];....的时候,任务被放到哪里。
但是推荐使用这种方式创建线程池:
package cn.ying.thread.pool; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class MyTest { public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) {
pool.execute(new MyTask(i));
}
pool.shutdown();
} public void note(){
Executors.newCachedThreadPool(); //*线程池,可以进行自动线程回收
// public static ExecutorService newCachedThreadPool() {
// return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
// 60L, TimeUnit.SECONDS,
// new SynchronousQueue<Runnable>());//SynchronousQueue:长度为1的队列
// }
Executors.newFixedThreadPool(10); //创建一个可重用固定线程数的线程池,以共享的*队列方式来运行这些线程。
// public static ExecutorService newFixedThreadPool(int nThreads) {
// return new ThreadPoolExecutor(nThreads, nThreads,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>());
// }
Executors.newScheduledThreadPool(10); //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行
// public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
// return new ScheduledThreadPoolExecutor(corePoolSize);
// }
// public ScheduledThreadPoolExecutor(int corePoolSize) {
// super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// new DelayedWorkQueue());
// }
Executors.newSingleThreadExecutor(); //创建一个使用单个 worker 线程的 Executor,以*队列方式来运行该线程
// public static ExecutorService newSingleThreadExecutor() {
// return new FinalizableDelegatedExecutorService
// (new ThreadPoolExecutor(1, 1,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>()));
// } }
}
==============================更新
4. 拒绝策略
1. AbortPolicy(丢弃任务并抛出RejectedExecutionException异常)默认策略
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
// 拒绝并抛出异常
new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 6; i++){
pool.execute(new Task("任务" + i));
}
pool.shutdown();
} private static class Task implements Runnable{
private String name; Task(String name) {
this.name = name;
} @Override
public void run() {
System.out.println(Thread.currentThread().getName() + "===" + name);
}
} }
运行:
2. DiscardPolicy(丢弃任务,但是不抛出异常)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
// 拒绝但不抛出异常
new ThreadPoolExecutor.DiscardPolicy()); for (int i = 0; i < 6; i++){
pool.execute(new Task("任务" + i));
}
pool.shutdown();
} private static class Task implements Runnable{
private String name; Task(String name) {
this.name = name;
} @Override
public void run() {
System.out.println(Thread.currentThread().getName() + "===" + name);
}
} }
运行:
3. DiscardOldestPolicy(丢弃队列最前面的任务,然后重新提交被拒绝的任务)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
// 丢弃队列中最前面的任务,然后执行被拒绝的任务
new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i < 6; i++){
pool.execute(new Task("任务" + i));
}
pool.shutdown();
} private static class Task implements Runnable{
private String name; Task(String name) {
this.name = name;
} @Override
public void run() {
System.out.println(Thread.currentThread().getName() + "===" + name);
}
} }
运行:
4. CallerRunsPolicy(由提交任务的线程处理)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
// 由提交任务的线程直接执行此任务
new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 6; i++){
pool.execute(new Task("任务" + i));
}
pool.shutdown();
} private static class Task implements Runnable{
private String name; Task(String name) {
this.name = name;
} @Override
public void run() {
System.out.println(Thread.currentThread().getName() + "===" + name);
}
} }
运行:
5. 自己处理
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
2,
0L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
// 自定义处理
(Runnable r, ThreadPoolExecutor executor) -> {
System.out.println("自定义处理");
}); for (int i = 0; i < 6; i++){
pool.execute(new Task("任务" + i));
}
pool.shutdown();
} private static class Task implements Runnable{
private String name; Task(String name) {
this.name = name;
} @Override
public void run() {
System.out.println(Thread.currentThread().getName() + "===" + name);
}
} }
运行: