Java_并发线程_CompletionService

时间:2023-11-11 17:48:32

1.CompletionService源代码分析

CompletionService内部实现还是维护了一个可堵塞的队列,通过代理设计模式。从而操作队列。

    /**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>(); //新建一个完毕队列
}
	//通过submit提交Callable任务对象
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));//线程池运行task对象
return f;
}
    /**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() {
completionQueue.add(task); //运行玩后将task返回对象放置于完毕队列
}
private final Future<V> task;
}
	//通过take方法取得Future对象
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

2.实例

	public static void main(String[] args) {

		ExecutorService threadPool = Executors.newFixedThreadPool(3);

		CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
//将任务加入至threadPool池中。可是仅仅分配3个Thread对象
for (int i = 1; i <= 10; i++) {
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
} for (int i = 0; i < 10; i++) {
try {
//completionService.take(), 至于call方法运行完毕,take堵塞採用数据
//future.get() 堵塞, 仅仅有当call运行完毕,
System.out.println(completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}