Java并发编程实践 目录
并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier
并发编程 06—— CompletionService : Executor 和 BlockingQueue
并发编程 10—— 任务取消 之 关闭 ExecutorService
并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略
概述
第1部分 问题引入
《Java并发编程实践》一书6.3.5节CompletionService:Executor和BlockingQueue,有这样一段话:
"如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。"
这是什么意思呢?通过一个例子,分别使用繁琐的做法和CompletionService来完成,清晰的对比能让我们更好的理解上面的一段话和CompletionService这个API提供的初衷。
第2部分 实例
考虑这样的场景,有5个Callable任务分别返回5个整数,然后我们在main方法中按照各个任务完成的先后顺序,在控制台打印返回结果。
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.TimeUnit; 5 6 /** 7 * 8 * @ClassName: ReturnAfterSleepCallable 9 * TODO 10 * @author Xingle 11 * @date 2014-9-16 上午9:20:34 12 */ 13 public class ReturnAfterSleepCallable implements Callable<Integer>{ 14 15 private int sleepSeconds; 16 private int returnValue; 17 18 public ReturnAfterSleepCallable(int sleepSeconds,int returnValue){ 19 this.sleepSeconds = sleepSeconds; 20 this.returnValue = returnValue; 21 } 22 23 24 @Override 25 public Integer call() throws Exception { 26 System.out.println("begin to execute "); 27 28 TimeUnit.SECONDS.sleep(sleepSeconds); 29 return returnValue; 30 } 31 32 }
1.繁琐的做法
通过一个List来保存每个任务返回的Future,然后轮询这些Future,直到每个Future都已完成。我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0。
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.ExecutionException; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Future; 9 10 /** 11 * 传统的繁琐做法 12 * @ClassName: TraditionalTest 13 * TODO 14 * @author Xingle 15 * @date 2014-9-16 上午10:06:21 16 */ 17 public class TraditionalTest { 18 19 public static void main(String[] args){ 20 int taskSize = 5; 21 ExecutorService executor = Executors.newFixedThreadPool(taskSize); 22 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); 23 24 for(int i= 1; i<=taskSize; i++){ 25 int sleep = taskSize -1; 26 int value = i; 27 //向线程池提交任务 28 Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value)); 29 //保留每个任务的Future 30 futureList.add(future); 31 } 32 // 轮询,获取完成任务的返回结果 33 while(taskSize > 0){ 34 for (Future<Integer> future : futureList){ 35 Integer result = null; 36 try { 37 result = future.get(); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } catch (ExecutionException e) { 41 e.printStackTrace(); 42 } 43 //任务已经完成 44 if(result!=null){ 45 System.out.println("result = "+result); 46 //从future列表中删除已经完成的任务 47 futureList.remove(future); 48 taskSize --; 49 break; 50 } 51 } 52 } 53 // 所有任务已经完成,关闭线程池 54 System.out.println("all over "); 55 executor.shutdown(); 56 } 57 58 }
执行结果:
2.使用CompletionService
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.concurrent.CompletionService; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.ExecutorCompletionService; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 9 /** 10 * 使用CompletionService 11 * @ClassName: CompletionServiceTest 12 * TODO 13 * @author Xingle 14 * @date 2014-9-16 上午11:32:45 15 */ 16 public class CompletionServiceTest { 17 18 public static void main(String[] args){ 19 int taskSize = 5; 20 ExecutorService executor = Executors.newFixedThreadPool(taskSize); 21 // 构建完成服务 22 CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor); 23 24 for (int i=1;i<= taskSize; i++){ 25 // 睡眠时间 26 int sleep = taskSize - i; 27 // 返回结果 28 int value = i; 29 //向线程池提交任务 30 completionService.submit(new ReturnAfterSleepCallable(sleep, value)); 31 try { 32 System.out.println("result:"+completionService.take().get()); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } catch (ExecutionException e) { 36 e.printStackTrace(); 37 } 38 } 39 40 System.out.println("all over. "); 41 executor.shutdown(); 42 43 } 44 45 }
执行结果:
3.CompletionService和ExecutorCompletionService的实现
JDK源码中CompletionService的javadoc说明如下:
/** * A service that decouples the production of new asynchronous tasks * from the consumption of the results of completed tasks. Producers * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt> * completed tasks and process their results in the order they * complete. */
ExecutorCompletionService是CompletionService的实现,融合了线程池Executor和阻塞队列BlockingQueue的功能。
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
QueueingFuture是FutureTask的一个子类,通过改写FutureTask类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
FutureTask.done(),这个方法默认什么都不做,就是一个回调,当提交的线程池中的任务完成时,会被自动调用。这也就说时候,当任务完成的时候,会自动执行QueueingFuture.done()方法,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。