Java并发编程--4.Executor框架

时间:2022-02-18 23:11:25

简介

Executor框架是启动,管理线程的API, 它的内部实现是线程池机制,它有很多好处,比如使任务提交和任务执行解耦合,防止this逃逸;
它的主要API包括: Executor,  Executors, ExecutorService , Callable,   Future,   CompletionService,    ThreadPoolExecutor

ExecutorService 生命周期

一个Executor的生命周期有三种状态: 运行状态,关闭状态,终止状态; ExecutorService中添加了生命周期管理的方法

Executor创建时, 处于运行状态; 当调用shutdown()后,处于关闭状态,停止接受新的线程,并执行已接受的线程任务; 所有任务执行完成后,处于终止状态

Executors 创建线程池

newFixedThreadPool : 创建定长的线程池,  最多有固定数量的线程, 如果还有创建新的线程,需要放到队列中等待, 直到有线程从池中移出
newCachedThreadPool :可缓存的线程池, 如果现有线程没有可用,则创建一个新线程并添加到池中。
60 秒钟未被使用的线程会被移除
newSingleThreadExecutor : 只创建一个工作线程, 当工作线程异常结束, 会重新创建一个线程
newScheduledThreadPool: 创建定长的线程池, 支持定时的任务执行

线程池执行Runnable的例子

public class MyExecutors {
public static void main(String[] args) {
ExecutorService executorService
= Executors.newCachedThreadPool();

for (int i = 0; i < 5; i++){
executorService.execute(
new MyRunnable());
}

executorService.shutdown();
}

}

class MyRunnable implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+ "被调用");
}
}

Callable和Future 携带结果的任务

Callable: 带有返回值的任务
Future: 保存一个任务执行后的结果

下面给出一个例子:

public class MyCallable {
public static void main(String[] args){
ExecutorService executorService
= Executors.newCachedThreadPool();
List
<Future<String>> resultList = new ArrayList<Future<String>>();

//创建10个任务并执行
for (int i = 0; i < 10; i++){
Future
<String> future = executorService.submit(new TaskWithResult(i));

//将任务执行结果存储到List中
resultList.add(future);
}

//遍历任务的结果
for (Future<String> fs : resultList){
try{
while(!fs.isDone());//Future返回如果没有完成,则一直循环等待,直到Future返回完成
System.out.println(fs.get()); //打印各个线程(任务)执行的结果

}
catch(Exception e){
e.printStackTrace();
}
finally{
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}
}


class TaskWithResult implements Callable<String>{
private int id;

public TaskWithResult(int id){
this.id = id;
}

/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,
* 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
System.out.println(
"call()方法被自动调用!!! " + Thread.currentThread().getName());
//该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName();
}
}

CompletionService完成服务,打包结果

任意任务完成后就把其加到结果中, 调用CompletionService的take()方法,返回 按任务的完成顺序  封装的结果, 像是一个打包的Future

下面给出一个例子

public class MyCompletionService {
public static void main(String[] args){
ExecutorService executorService
= Executors.newCachedThreadPool();

//构造函数传入一个Executor
CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

//创建10个任务并执行
for (int i = 0; i < 10; i++){
if(!executorService.isShutdown()) {
//由CompletionService执行任务
completionService.submit(new Result0());
}

}

//把多个任务的结果加起来
String result = "2_";
try {
for (int i = 0; i < 10; i++) {
// 如果任务未完成,则该任务的take()会阻塞
String s = completionService.take().get();
result
+= s;
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (ExecutionException e) {
e.printStackTrace();
}

//输出最终的计算结果
System.out.println(result);

try{

}
catch(Exception e){
e.printStackTrace();
}
finally{
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}

class Result0 implements Callable<String>{

/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,
* 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
//该返回结果将被Future的get方法得到
return "1";
}
}

ThreadPoolExecutor自定义线程池

ThreadPoolExecutor 有多个构造方法创建线程池,下面是一个构造方法

public ThreadPoolExecutor(int corePoolSize,     
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
<Runnable> workQueue)
参数说明:
corePoolSize:线程池中所保存的核心线程数,包括空闲线程。
maximumPoolSize:池中允许的最大线程数。
keepAliveTime:线程池中的空闲线程所能持续的最长时间。
unit:持续时间的单位。
workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。

添加任务的过程

1、池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

2、池中的线程数量大于等于corePoolSize,但缓冲队列未满,则将新添加的任务放到workQueue中,
线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行;
如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池

3、池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

4、如果线程池中的线程数量等于了maximumPoolSize,通过设置饱和策略处理,也即是设置6个参数构造函数的第6个参数RejectedExecutionHandler

总体流程就是: 先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize

排队策略

*队列: 采用预定义容量的LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队,newFixedThreadPool采用的便是这种策略
有界队列: 一般使用ArrayBlockingQueue制定队列的长度
同步移交 :SynchronousQueue 跳过队列,将任务从生产者直接交给消费者,如果不存在可用于立即运行任务的线程,会构造一个新的线程来处理新添加的任务,通常是*的,newCa chedThreadPool采用的便是这种策略。

使用案例

public class MyThreadPoolExecutor {
public static void main(String[] args){
//创建等待队列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);

//创建线程池,池中保存的线程数为3,允许的最大线程数为5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);

//创建七个任务
Runnable t1 = new MyThread();
Runnable t2
= new MyThread();
Runnable t3
= new MyThread();
Runnable t4
= new MyThread();
Runnable t5
= new MyThread();
Runnable t6
= new MyThread();
Runnable t7
= new MyThread();

//每个任务会在一个线程上执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);

//关闭线程池
pool.shutdown();
}
}

class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName()
+ "正在执行。。。");
try{
Thread.sleep(
100);
}
catch(InterruptedException e){
e.printStackTrace();
}
}
}

控制台输出:

pool-1-thread-2正在执行。。。
pool
-1-thread-1正在执行。。。
pool
-1-thread-3正在执行。。。
pool
-1-thread-2正在执行。。。
pool
-1-thread-1正在执行。。。
pool
-1-thread-3正在执行。。。
pool
-1-thread-2正在执行。。。

可以看出7个任务在3个线程上执行