由于spliterator中存在不可配置的批处理大小策略,导致读写器#lines()的并行性很差

时间:2022-03-29 13:50:21

I cannot achieve good parallelization of stream processing when the stream source is a Reader. Running the code below on a quad-core CPU I observe 3 cores being used at first, then a sudden drop to just two, then one core. Overall CPU utilization hovers around 50%.

当流源为读取器时,无法实现流处理的良好并行化。在一个四核CPU上运行下面的代码,我观察到一开始使用了3个核,然后突然降到只有两个核,然后是一个核。总体CPU利用率徘徊在50%左右。

Note the following characteristics of the example:

注意示例的以下特征:

  • there are just 6,000 lines;
  • 只有6000条线;
  • each line takes about 20 ms to process;
  • 每一行大约需要20毫秒的时间;
  • the whole procedure takes about a minute.
  • 整个过程大约需要一分钟。

That means that all the pressure is on the CPU and I/O is minimal. The example is a sitting duck for automatic parallelization.

这意味着所有的压力都在CPU上,而I/O是最小的。这个例子是自动并行的一个例子。

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

... class imports elided ...    

public class Main
{
  static final AtomicLong totalTime = new AtomicLong();

  public static void main(String[] args) throws IOException {
    final long start = System.nanoTime();
    final Path inputPath = createInput();
    System.out.println("Start processing");

    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
      Files.lines(inputPath).parallel().map(Main::processLine)
        .forEach(w::println);
    }

    final double cpuTime = totalTime.get(),
                 realTime = System.nanoTime()-start;
    final int cores = Runtime.getRuntime().availableProcessors();
    System.out.println("          Cores: " + cores);
    System.out.format("       CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
    System.out.format("      Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
    System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
  }

  private static String processLine(String line) {
    final long localStart = System.nanoTime();
    double ret = 0;
    for (int i = 0; i < line.length(); i++)
      for (int j = 0; j < line.length(); j++)
        ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
    final long took = System.nanoTime()-localStart;
    totalTime.getAndAdd(took);
    return NANOSECONDS.toMillis(took) + " " + ret;
  }

  private static Path createInput() throws IOException {
    final Path inputPath = Paths.get("input.txt");
    try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
      for (int i = 0; i < 6_000; i++) {
        final String text = String.valueOf(System.nanoTime());
        for (int j = 0; j < 25; j++) w.print(text);
        w.println();
      }
    }
    return inputPath;
  }
}

My typical output:

我的典型的输出:

          Cores: 4
       CPU time: 110.23 s
      Real time: 53.60 s
CPU utilization: 51.41%

For comparison, if I use a slightly modified variant where I first collect into a list and then process:

作为比较,如果我使用一个稍微修改过的变体,首先将其收集到列表中,然后进行处理:

Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
  .forEach(w::println);

I get this typical output:

我得到这个典型的输出:

          Cores: 4
       CPU time: 138.43 s
      Real time: 35.00 s
CPU utilization: 98.87%

What could explain that effect, and how can I work around it to get full utilization?

什么能解释这种影响,我该如何在它周围工作以得到充分利用?

Note that I have originally observed this on a reader of servlet input stream so it's not specific to a FileReader.

注意,我最初是在servlet输入流的读取器上观察到这一点的,所以它不是特定于FileReader的。

4 个解决方案

#1


6  

Here is the answer, spelled out in the source code of Spliterators.IteratorSpliterator, the one used by BufferedReader#lines():

答案在Spliterators的源代码中有详细说明。BufferedReader#lines()使用的IteratorSpliterator:

    @Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a, 0, j, characteristics);
        }
        return null;
    }

Also noteworthy are the constants:

同样值得注意的是常数:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

So in my example, where I use 6,000 elements, I get just three batches because the batch size step is 1024. That precisely explains my observation that initially three cores are used, dropping to two and then one as the smaller batches complete. In the meantime I tried a modified example with 60,000 elements and then I get almost 100% CPU utilization.

在我的例子中,我使用了6000个元素,我只得到了3批因为批次大小步骤是1024。这正好解释了我的观察,最初使用的是三个内核,当较小的批完成时减少到两个,然后是一个。同时,我尝试了一个包含60000个元素的修改示例,然后我获得了几乎100%的CPU利用率。

To solve my problem I have developed the code below which allows me to turn any existing stream into one whose Spliterator#trySplit will partition it into batches of specified size. The simplest way to use it for the use case from my question is like this:

为了解决我的问题,我开发了下面的代码,它允许我将任何现有的流转换为Spliterator#trySplit将其分割成指定大小的批。从我的问题中最简单的用例使用它的方法是这样的:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

On a lower level, the class below is a spliterator wrapper which changes the wrapped spliterator's trySplit behavior and leaves other aspects unchanged.

在较低的层次上,下面的类是一个spliterator包装器,它更改了包装的spliterator的trySplit行为,并保持其他方面不变。


import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

#2


4  

This problem is to some extent fixed in Java-9 early access builds. The Files.lines was rewritten and now upon splitting it actually jumps into the middle of memory-mapped file. Here's the results on my machine (which has 4 HyperThreading cores = 8 hardware threads):

这个问题在Java-9早期访问构建中在某种程度上得到了解决。这些文件。行被重写,现在拆分它实际上跳到内存映射文件的中间。这是我的机器上的结果(它有4个超线程内核= 8个硬件线程):

Java 8u60:

Java 8 u60:

Start processing
          Cores: 8
       CPU time: 73,50 s
      Real time: 36,54 s
CPU utilization: 25,15%

Java 9b82:

Java 9 b82:

Start processing
          Cores: 8
       CPU time: 79,64 s
      Real time: 10,48 s
CPU utilization: 94,95%

As you can see, both real time and CPU utilization is greatly improved.

如您所见,实时和CPU利用率都得到了很大的提高。

This optimization has some limitations though. Currently it works only for several encodings (namely UTF-8, ISO_8859_1 and US_ASCII) as for arbitrary encoding you don't know exactly how line-break is encoded. It's limited to the files of no more than 2Gb size (due to limitations of MappedByteBuffer in Java) and of course does not work for some non-regular files (like character devices, named pipes which cannot be memory-mapped). In such cases the old implementation is used as the fallback.

这种优化有一些局限性。目前,它只适用于几个编码(即UTF-8、ISO_8859_1和US_ASCII),而对于任意编码,您无法确切地知道断行是如何编码的。它仅限于大小不超过2Gb的文件(由于Java中MappedByteBuffer的限制),当然也不适用于一些非常规文件(如字符设备、不能被内存映射的命名管道)。在这种情况下,旧的实现被用作回退。

#3


2  

The parallel execution of streams is based on a fork-join model. For ordered streams, the parallel execution only works, if the stream can be split into parts, strictly following one another. In general, that's not possible with streams generated by BufferedReader. However, in theory parallel execution should be possible for unordered streams:

流的并行执行基于fork-join模型。对于有序的流,并行执行只在流可以被分割成几个部分的情况下有效,并且严格地遵循彼此。一般来说,对于由BufferedReader生成的流来说,这是不可能的。然而,理论上,对于无序流,并行执行应该是可能的:

BufferedReader reader = ...;
reader.lines().unordered().map(...);

I am not sure if the stream returned by BufferedReader supports this kind of parallel execution. A very simple alternative is to create an intermediate list:

我不确定BufferedReader返回的流是否支持这种并行执行。一个非常简单的选择是创建一个中间列表:

BufferedReader reader = ...;
reader.lines().collect(toList()).parallelStream().map(...);

In this case, the parallel execution starts after all lines have been read. This might be a problem, if reading the lines takes a long time. In this case, I recommend using an ExecutorService for parallel execution instead of parallel streams:

在本例中,并行执行在读取所有行之后开始。这可能是个问题,如果读取这些行需要很长时间。在这种情况下,我建议使用ExecutorService进行并行执行,而不是并行流:

ExecutorService executor = ...;
BufferedReader reader = ...;
reader.lines()
   .map(line -> executor.submit(() -> ... line ...))
   .collect(toList())
   .stream()
   .map(future -> future.get())
   .map(...);

#4


1  

To find the real cause of this, you need to dig into the Files.lines() source, which calls the BufferedReader.lines(), which is the following:

要找到真正的原因,您需要深入研究Files.lines()源代码,它调用BufferedReader.lines(),如下所示:

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() {
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}

Here it returns a Stream<String> that is:

这里它返回一个流 ,即:

  • Of unknown size
  • 未知的大小
  • Ordered
  • 命令
  • Not null
  • 非空
  • Not parallel (the false argument at the end of StreamSupport.stream()
  • 不是parallel (StreamSupport.stream()末尾的假参数)

And hence I am really unsure of whether it is even subject to be being parallellised, this could be found by digging even further into the source.

因此我真的不确定它是否会被平行化,这可以通过进一步挖掘源找到。

What I do know is that parallel streams get explicitely provided in the Java APIs. Take for example List, it has a List.stream() and List.parallelStream() method.

我所知道的是并行流是在Java api中明确提供的。以列表为例,它有一个List.stream()和List. parallelstream()方法。

#1


6  

Here is the answer, spelled out in the source code of Spliterators.IteratorSpliterator, the one used by BufferedReader#lines():

答案在Spliterators的源代码中有详细说明。BufferedReader#lines()使用的IteratorSpliterator:

    @Override
    public Spliterator<T> trySplit() {
        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */
        Iterator<? extends T> i;
        long s;
        if ((i = it) == null) {
            i = it = collection.iterator();
            s = est = (long) collection.size();
        }
        else
            s = est;
        if (s > 1 && i.hasNext()) {
            int n = batch + BATCH_UNIT;
            if (n > s)
                n = (int) s;
            if (n > MAX_BATCH)
                n = MAX_BATCH;
            Object[] a = new Object[n];
            int j = 0;
            do { a[j] = i.next(); } while (++j < n && i.hasNext());
            batch = j;
            if (est != Long.MAX_VALUE)
                est -= j;
            return new ArraySpliterator<>(a, 0, j, characteristics);
        }
        return null;
    }

Also noteworthy are the constants:

同样值得注意的是常数:

static final int BATCH_UNIT = 1 << 10;  // batch array size increment
static final int MAX_BATCH = 1 << 25;  // max batch array size;

So in my example, where I use 6,000 elements, I get just three batches because the batch size step is 1024. That precisely explains my observation that initially three cores are used, dropping to two and then one as the smaller batches complete. In the meantime I tried a modified example with 60,000 elements and then I get almost 100% CPU utilization.

在我的例子中,我使用了6000个元素,我只得到了3批因为批次大小步骤是1024。这正好解释了我的观察,最初使用的是三个内核,当较小的批完成时减少到两个,然后是一个。同时,我尝试了一个包含60000个元素的修改示例,然后我获得了几乎100%的CPU利用率。

To solve my problem I have developed the code below which allows me to turn any existing stream into one whose Spliterator#trySplit will partition it into batches of specified size. The simplest way to use it for the use case from my question is like this:

为了解决我的问题,我开发了下面的代码,它允许我将任何现有的流转换为Spliterator#trySplit将其分割成指定大小的批。从我的问题中最简单的用例使用它的方法是这样的:

toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)

On a lower level, the class below is a spliterator wrapper which changes the wrapped spliterator's trySplit behavior and leaves other aspects unchanged.

在较低的层次上,下面的类是一个spliterator包装器,它更改了包装的spliterator的trySplit行为,并保持其他方面不变。


import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.est = est;
    this.batchSize = batchSize;
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }

  public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

#2


4  

This problem is to some extent fixed in Java-9 early access builds. The Files.lines was rewritten and now upon splitting it actually jumps into the middle of memory-mapped file. Here's the results on my machine (which has 4 HyperThreading cores = 8 hardware threads):

这个问题在Java-9早期访问构建中在某种程度上得到了解决。这些文件。行被重写,现在拆分它实际上跳到内存映射文件的中间。这是我的机器上的结果(它有4个超线程内核= 8个硬件线程):

Java 8u60:

Java 8 u60:

Start processing
          Cores: 8
       CPU time: 73,50 s
      Real time: 36,54 s
CPU utilization: 25,15%

Java 9b82:

Java 9 b82:

Start processing
          Cores: 8
       CPU time: 79,64 s
      Real time: 10,48 s
CPU utilization: 94,95%

As you can see, both real time and CPU utilization is greatly improved.

如您所见,实时和CPU利用率都得到了很大的提高。

This optimization has some limitations though. Currently it works only for several encodings (namely UTF-8, ISO_8859_1 and US_ASCII) as for arbitrary encoding you don't know exactly how line-break is encoded. It's limited to the files of no more than 2Gb size (due to limitations of MappedByteBuffer in Java) and of course does not work for some non-regular files (like character devices, named pipes which cannot be memory-mapped). In such cases the old implementation is used as the fallback.

这种优化有一些局限性。目前,它只适用于几个编码(即UTF-8、ISO_8859_1和US_ASCII),而对于任意编码,您无法确切地知道断行是如何编码的。它仅限于大小不超过2Gb的文件(由于Java中MappedByteBuffer的限制),当然也不适用于一些非常规文件(如字符设备、不能被内存映射的命名管道)。在这种情况下,旧的实现被用作回退。

#3


2  

The parallel execution of streams is based on a fork-join model. For ordered streams, the parallel execution only works, if the stream can be split into parts, strictly following one another. In general, that's not possible with streams generated by BufferedReader. However, in theory parallel execution should be possible for unordered streams:

流的并行执行基于fork-join模型。对于有序的流,并行执行只在流可以被分割成几个部分的情况下有效,并且严格地遵循彼此。一般来说,对于由BufferedReader生成的流来说,这是不可能的。然而,理论上,对于无序流,并行执行应该是可能的:

BufferedReader reader = ...;
reader.lines().unordered().map(...);

I am not sure if the stream returned by BufferedReader supports this kind of parallel execution. A very simple alternative is to create an intermediate list:

我不确定BufferedReader返回的流是否支持这种并行执行。一个非常简单的选择是创建一个中间列表:

BufferedReader reader = ...;
reader.lines().collect(toList()).parallelStream().map(...);

In this case, the parallel execution starts after all lines have been read. This might be a problem, if reading the lines takes a long time. In this case, I recommend using an ExecutorService for parallel execution instead of parallel streams:

在本例中,并行执行在读取所有行之后开始。这可能是个问题,如果读取这些行需要很长时间。在这种情况下,我建议使用ExecutorService进行并行执行,而不是并行流:

ExecutorService executor = ...;
BufferedReader reader = ...;
reader.lines()
   .map(line -> executor.submit(() -> ... line ...))
   .collect(toList())
   .stream()
   .map(future -> future.get())
   .map(...);

#4


1  

To find the real cause of this, you need to dig into the Files.lines() source, which calls the BufferedReader.lines(), which is the following:

要找到真正的原因,您需要深入研究Files.lines()源代码,它调用BufferedReader.lines(),如下所示:

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() {
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}

Here it returns a Stream<String> that is:

这里它返回一个流 ,即:

  • Of unknown size
  • 未知的大小
  • Ordered
  • 命令
  • Not null
  • 非空
  • Not parallel (the false argument at the end of StreamSupport.stream()
  • 不是parallel (StreamSupport.stream()末尾的假参数)

And hence I am really unsure of whether it is even subject to be being parallellised, this could be found by digging even further into the source.

因此我真的不确定它是否会被平行化,这可以通过进一步挖掘源找到。

What I do know is that parallel streams get explicitely provided in the Java APIs. Take for example List, it has a List.stream() and List.parallelStream() method.

我所知道的是并行流是在Java api中明确提供的。以列表为例,它有一个List.stream()和List. parallelstream()方法。