JAVA并发-Executor任务执行框架

时间:2021-08-01 18:36:26
首先介绍两个重要的接口,Executor和ExecutorService,定义如下:
public interface Executor {    void execute(Runnable command);
}

public interface ExecutorService extends Executor {
//不再接受新任务,待所有任务执行完毕后关闭ExecutorService
void shutdown();
//不再接受新任务,直接关闭ExecutorService,返回没有执行的任务列表
List<Runnable> shutdownNow();
//判断ExecutorService是否关闭
boolean isShutdown();
//判断ExecutorService是否终止
boolean isTerminated();
//等待ExecutorService到达终止状态
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
//当task执行成功的时候future.get()返回result
<T> Future<T> submit(Runnable task, T result);
//当task执行成功的时候future.get()返回null
Future<?> submit(Runnable task);
//批量提交任务并获得他们的future,Task列表与Future列表一一对应
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//批量提交任务并获得他们的future,并限定处理所有任务的时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException;
//批量提交任务并获得一个已经成功执行的任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


为了配合使用上面的并发编程接口, 有一个Executors工厂类,负责创建各类满足ExecutorService接口的线程池,具体如下:
newFixedThreadPool:创建一个固定长度的线程池,线程池中线程的数量从1增加到最大值后保持不变。如果某个线程坏死掉,将会补充一个新的线程。
newCachedThreadPool:创建长度不固定的线程池,线程池的规模不受限制, 不常用
newSingleThreadExecutor:创建一个单线程的Executor,他其中有一个线程来处理任务,如果这个线程坏死掉,将补充一个新线程。
newScheduledThreadPool:创建固定长度的线程池,以延时或定时的方式来执行任务。

下面是Executor和ExecutorService中常用方法的示例:
import java.util.ArrayList;import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Demo{
public static void main(String [] args){
//--------Executor示例------------//
Executor s=Executors.newSingleThreadExecutor();
s.execute(new MyRunnableTask("1"));

//--------ExecutorService示例------------//
ExecutorService es=Executors.newFixedThreadPool(2);

//--------get()示例------------//
Future<String> future=es.submit(new MyCallableTask("10"));
try{
System.out.println(future.get());
}catch(Exception e){}

//--------get(timeout, timeunit)示例------------//
future=es.submit(new MyCallableTask("11"));
try{
System.out.println(future.get(500,TimeUnit.MILLISECONDS));
}catch(Exception e){
System.out.println("cancle because timeout");
}

//--------invokeAll(tasks)示例------------//
List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>();
for(int i=0;i<6;i++){
myCallableTasks.add(new MyCallableTask(i+""));
}
try {
List<Future<String>> results = es.invokeAll(myCallableTasks);
Iterator<Future<String>> iterator=results.iterator();
while(iterator.hasNext()){
future=iterator.next();
System.out.println(future.get());
}
} catch (Exception e) {}

//--------invokeAll(tasks,timeout,timeunit))示例------------//
try {
//限定执行时间为2100ms,每个任务需要1000ms,线程池的长度为2,因此最多只能处理4个任务。一共6个任务,有2个任务会被取消。
List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS);
Iterator<Future<String>> iterator=results.iterator();
while(iterator.hasNext()){
future=iterator.next();
if(!future.isCancelled())
System.out.println(future.get());
else
System.out.println("cancle because timeout");
}
} catch (Exception e) {}
es.shutdown();
}
}

class MyRunnableTask implements Runnable{
private String name;
public MyRunnableTask(String name) {
this.name=name;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runnable task--"+name);
}
}
class MyCallableTask implements Callable<String>{
private String name;
public MyCallableTask(String name) {
this.name=name;
}
@Override
public String call() throws Exception {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
StringBuilder sb=new StringBuilder("callable task--");
return sb.append(name).toString();
}
}


上面的ExecutorSerivce接口中的invokeAll(tasks)方法用于批量执行任务,并且将结果按照task列表中的顺序返回。此外, 还存在一个批量执行任务的接口CompletionTask。ExecutorCompletionService是实现CompletionService接口的一个类,该类的实现原理很简单:

用Executor类来执行任务,同时把在执行任务的Future放到BlockingQueue<Future<V>>队列中。该类实现的关键就是重写FutureTask类的done()方法,FutureTask类的done()方法是一个钩子函数(关于钩子函数,请读者自行查询),done()方法在FutureTask任务被执行的时候被调用。

ExecutorCompletionService类的核心代码如下:

public Future<V> submit(Runnable task, V result) {    if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

其中的done()方法定义如下:
 /**     * Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }


ExecutorCompletionService的使用示例如下:
import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Demo{
public static void main(String [] args) throws InterruptedException, ExecutionException{
CompletionService<String> cs=new ExecutorCompletionService<String>(
Executors.newFixedThreadPool(2));
for(int i=0;i<6;i++){
cs.submit(new MyCallableTask(i+""));
}
for(int i=0;i<6;i++){
Future<String> future=cs.take();
//Retrieves and removes the Future representing the next completed task,
//waiting if none are yet present.
System.out.println(future.get());
}
}
}

class MyCallableTask implements Callable<String>{
private String name;
public MyCallableTask(String name) {
this.name=name;
}
@Override
public String call() throws Exception {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
StringBuilder sb=new StringBuilder("callable task--");
return sb.append(name).toString();
}
}