并发地执行计算,并汇总结果

时间:2022-09-21 10:18:42

方法包括:

Fork/Join框架

Executor框架

CountDownLatch

CyclicBarrier

直接上代码

1. Fork/Join

public class ForkJoin extends RecursiveTask<Integer>{
	private static final int THRESHOLD = 10;
	private int from;
	private int to;
	
	public ForkJoin(int from, int to) {
		this.from = from;
		this.to = to;
	}
	
	@Override
	protected Integer compute() {
		int sum = 0;
		boolean canCompute = to - from < THRESHOLD;
		//任务足够小就直接计算
		if(canCompute) {
			for(int i = from; i <= to; i++) {
				sum += i;
			}
		}
		//分裂成子任务
		else {
			int mid = (from + to) / 2;
			ForkJoin leftTask = new ForkJoin(from, mid);
			ForkJoin righthtTask = new ForkJoin(mid + 1, to);
			//执行子任务
			leftTask.fork();
			righthtTask.fork();
			//合并结果
			sum += leftTask.join();
			sum += righthtTask.join();
		}
		return sum;
	}
	
	public int add() {
		ForkJoinPool pool = new ForkJoinPool();
		Future<Integer> future = pool.submit(this);
		try {
			return future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		return 0;
	}
}
2. Executor

public class ExecutorAdd {
	private static final int THRESHOLD = 10;//阈值,每个子任务都小于阈值
	private ExecutorService executor = Executors.newFixedThreadPool(4);
	
	public int add(int from, int to) {
		CompletionService<Integer> completion = 
				new ExecutorCompletionService<Integer>(executor);
		for(int index = from; index <= to;) {
			//划分子任务
			final Integer subFrom = index;
			final Integer subTo = subFrom + THRESHOLD - 1 < to ? subFrom + THRESHOLD - 1: to;
			index = subTo + 1;
			completion.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					int sum = 0;
					for(int i=subFrom; i<=subTo; i++)
						sum+=i;
					return sum;
				}
			});
		}
		
		int result = 0;
		try {
			for(int i=0; i<(to-from+THRESHOLD)/THRESHOLD; i++) {
				Future<Integer> future = completion.take();
				result += future.get();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} catch (ExecutionException e) {
			e.printStackTrace();
		} finally {
			return result;
		}
	}
}
3. CountDownLatch

public class LatchAdd {
	private static final int THRESHOLD = 10;
	private int result;
	
	public int add(int from, int to) {
		final CountDownLatch latch = 
				new CountDownLatch((to - from + THRESHOLD) / THRESHOLD);
		result = 0;
		for(int index = from; index <= to;) {
			final Integer subFrom = index;
			final Integer subTo = subFrom + THRESHOLD - 1 < to ? subFrom + THRESHOLD - 1: to;
			index = subTo + 1;
			new Thread() {
				@Override
				public void run() {
					int sum = 0;
					for(int i=subFrom; i<=subTo; i++)
						sum+=i;
					LatchAdd.this.addToResult(sum);
					latch.countDown();
				}
			}.start();
		}
		
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			return result;
		}
	}
	
	private synchronized void addToResult(int addition) {
		result += addition;
	}
}
4. CyclicBarrier

与CountDownLatch类似,主线程和进行计算的线程都调用await()同步,所有线程到达barrier point之后,主线程停止阻塞,返回计算结果

分析

Fork/Join, Executor v.s. CountDownLatch, CyclicBarrier:

1. 前两个方法,计算在Task中进行,Task submission和Task execution分离,由ForkJoinPool和Executor管理线程的生命周期

后两个方法每次执行必须涉及线程的创建和删除,性能会受到影响

2. 后两个方法为了汇总计算结果,需要同步汇总操作: addToResult(int addition),会有性能损失

Fork/Join v.s. Executor:

1.Fork/Join计算的分配和汇总都是在ForkJoinTask中,高内聚

2.Fork/Join的子任务是放在不同的队列中的,减少了任务之间的竞争

Fork/Join参考了聊聊并发(八)——Fork/Join框架介绍