方法包括:
Fork/Join框架
Executor框架
CountDownLatch
CyclicBarrier
直接上代码
1. Fork/Join
public class ForkJoin extends RecursiveTask<Integer>{ private static final int THRESHOLD = 10; private int from; private int to; public ForkJoin(int from, int to) { this.from = from; this.to = to; } @Override protected Integer compute() { int sum = 0; boolean canCompute = to - from < THRESHOLD; //任务足够小就直接计算 if(canCompute) { for(int i = from; i <= to; i++) { sum += i; } } //分裂成子任务 else { int mid = (from + to) / 2; ForkJoin leftTask = new ForkJoin(from, mid); ForkJoin righthtTask = new ForkJoin(mid + 1, to); //执行子任务 leftTask.fork(); righthtTask.fork(); //合并结果 sum += leftTask.join(); sum += righthtTask.join(); } return sum; } public int add() { ForkJoinPool pool = new ForkJoinPool(); Future<Integer> future = pool.submit(this); try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } catch (ExecutionException e) { e.printStackTrace(); } return 0; } }2. Executor
public class ExecutorAdd { private static final int THRESHOLD = 10;//阈值,每个子任务都小于阈值 private ExecutorService executor = Executors.newFixedThreadPool(4); public int add(int from, int to) { CompletionService<Integer> completion = new ExecutorCompletionService<Integer>(executor); for(int index = from; index <= to;) { //划分子任务 final Integer subFrom = index; final Integer subTo = subFrom + THRESHOLD - 1 < to ? subFrom + THRESHOLD - 1: to; index = subTo + 1; completion.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; for(int i=subFrom; i<=subTo; i++) sum+=i; return sum; } }); } int result = 0; try { for(int i=0; i<(to-from+THRESHOLD)/THRESHOLD; i++) { Future<Integer> future = completion.take(); result += future.get(); } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } catch (ExecutionException e) { e.printStackTrace(); } finally { return result; } } }3. CountDownLatch
public class LatchAdd { private static final int THRESHOLD = 10; private int result; public int add(int from, int to) { final CountDownLatch latch = new CountDownLatch((to - from + THRESHOLD) / THRESHOLD); result = 0; for(int index = from; index <= to;) { final Integer subFrom = index; final Integer subTo = subFrom + THRESHOLD - 1 < to ? subFrom + THRESHOLD - 1: to; index = subTo + 1; new Thread() { @Override public void run() { int sum = 0; for(int i=subFrom; i<=subTo; i++) sum+=i; LatchAdd.this.addToResult(sum); latch.countDown(); } }.start(); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { return result; } } private synchronized void addToResult(int addition) { result += addition; } }4. CyclicBarrier
与CountDownLatch类似,主线程和进行计算的线程都调用await()同步,所有线程到达barrier point之后,主线程停止阻塞,返回计算结果
分析
Fork/Join, Executor v.s. CountDownLatch, CyclicBarrier:
1. 前两个方法,计算在Task中进行,Task submission和Task execution分离,由ForkJoinPool和Executor管理线程的生命周期
后两个方法每次执行必须涉及线程的创建和删除,性能会受到影响
2. 后两个方法为了汇总计算结果,需要同步汇总操作: addToResult(int addition),会有性能损失
Fork/Join v.s. Executor:
1.Fork/Join计算的分配和汇总都是在ForkJoinTask中,高内聚
2.Fork/Join的子任务是放在不同的队列中的,减少了任务之间的竞争
Fork/Join参考了聊聊并发(八)——Fork/Join框架介绍