并行流与顺序流性能对比试验

时间:2022-10-21 17:39:34

通过试验得到,并行流不一定要比顺序流快,所以在选择的时候要注意使用。

代码:
ParallelStreamsHarness .java


import java.util.concurrent.*;
import java.util.function.*;

public class ParallelStreamsHarness {

public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();

public static void main(String[] args) {
System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 100_000_000L) + " msecs");
System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 100_000_000L) + " msecs" );
System.out.println("SideEffect sum done in: " + measurePerf(ParallelStreams::sideEffectSum, 10_000_000L) + " msecs" );
System.out.println("SideEffect prallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs" );
}

public static <T, R> long measurePerf(Function<T, R> f, T input) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
R result = f.apply(input);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + result);
if (duration < fastest) fastest = duration;
}
return fastest;
}
}

ParallelStreams .java


import java.util.stream.*;

public class ParallelStreams {

public static long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}

public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
}

public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
}

public static long rangedSum(long n) {
return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
}

public static long parallelRangedSum(long n) {
return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
}

public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}

public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}

public static class Accumulator {
private long total = 0;

public void add(long value) {
total += value;
}
}
}

ForkJoinSumCalculator .java


import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

import static lambdasinaction.chap7.ParallelStreamsHarness.FORK_JOIN_POOL;

@SuppressWarnings("serial")
public class ForkJoinSumCalculator extends RecursiveTask<Long> {

public static final long THRESHOLD = 10_000;

private final long[] numbers;
private final int start;
private final int end;

public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}

private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}

private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}

public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return FORK_JOIN_POOL.invoke(task);
}
}