并行流,收集器和线程安全

时间:2021-04-30 07:09:33

See the simple example below that counts the number of occurences of each word in a list:

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

At the end, wordsCount is {a=2, b=1, c=1}.

最后,wordsCount是{a = 2,b = 1,c = 1}。

But my stream is very large and I want to parallelise the job, so I write:

但我的流非常大,我想要并行工作,所以我写道:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

However I have noticed that wordsCount is a simple HashMap so I wonder if I need to explicitly ask for a concurrent map to ensure thread safety:

但是我注意到wordsCount是一个简单的HashMap,所以我想知道是否需要显式请求并发映射以确保线程安全:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

非并发收集器可以安全地与并行流一起使用,还是在从并行流收集时只应使用并发版本?

3 个解决方案

#1


37  

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

非并发收集器可以安全地与并行流一起使用,还是在从并行流收集时只应使用并发版本?

It is safe to use a non-concurrent collector in a collect operation of a parallel stream.

在并行流的收集操作中使用非并发收集器是安全的。

In the specification of the Collector interface, in the section with half a dozen bullet points, is this:

在收集器界面的规范中,在带有六个子弹点的部分中,是:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

对于非并发收集器,从结果提供者,累加器或组合器函数返回的任何结果必须是串行线程限制的。这使得集合可以并行发生,而收集器不需要实现任何其他同步。减少实现必须管理输入被正确分区,分区是单独处理的,并且只有在累积完成后才进行组合。

This means that the various implementations provided by the Collectors class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

这意味着Collectors类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器。这也适用于您可能实现的任何非并发收集器。它们可以安全地与并行流一起使用,前提是您的收集器不会干扰流源,无副作用,顺序无关等。

I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList, which is not thread-safe.

我还建议阅读java.util.stream包文档中的Mutable Reduction部分。在本节的中间是一个声明可并行化的示例,但它将结果收集到ArrayList中,这不是线程安全的。

The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.

这种方式的工作方式是以非并发收集器结尾的并行流确保不同的线程始终在中间结果集合的不同实例上运行。这就是为什么收集器具有Supplier函数,用于创建与线程一样多的中间集合,因此每个线程可以累积到自己的集合中。当要合并中间结果时,它们在线程之间安全地切换,并且在任何给定时间只有一个线程合并任何一对中间结果。

#2


19  

All collectors, if they follow the rules in the specification, are safe to run in parallel or sequential. Parallel-readiness is a key part of the design here.

如果所有收集器遵循规范中的规则,则可以安全地并行或顺序运行。并行准备是这里设计的关键部分。

The distinction between concurrent and non-concurrent collectors have to do with the approach to parallelization.

并发和非并发收集器之间的区别与并行化方法有关。

An ordinary (non-concurrent) collector operates by merging sub-results. So the source is partitioned into a bunch of chunks, each chunk is collected into a result container (like a list or a map), and then the sub-results are merged into a bigger result container. This is safe and order-preserving, but for some kinds of containers -- especially maps -- can be expensive, since merging two maps by key is often expensive.

普通(非并发)收集器通过合并子结果来操作。因此,源被分成一堆块,每个块被收集到一个结果容器(如列表或映射)中,然后子结果被合并到一个更大的结果容器中。这是安全且保持订单的,但是对于某些类型的容器 - 尤其是地图 - 可能很昂贵,因为按键合并两个地图通常很昂贵。

A concurrent collector instead creates one result container, whose insertion operations are guaranteed to be thread-safe, and blasts elements into it from multiple threads. With a highly concurrent result container like ConcurrentHashMap, this approach may well perform better than merging ordinary HashMaps.

并发收集器会创建一个结果容器,其插入操作保证是线程安全的,并从多个线程中将元素压缩到其中。使用像ConcurrentHashMap这样的高度并发结果容器,这种方法可能比合并普通的HashMaps更好。

So, the concurrent collectors are strictly optimizations over their ordinary counterparts. And they don't come without a cost; because elements are being blasted in from many threads, concurrent collectors generally cannot preserve encounter order. (But, often you don't care -- when creating a word count histogram, you don't care which instance of "foo" you counted first.)

因此,并发收集器严格优于普通收集器。而且他们没有成本;因为元素是从许多线程中被抨击的,所以并发收集器通常不能保留遭遇顺序。 (但是,通常你不关心 - 在创建单词计数直方图时,你不关心你首先计算的“foo”实例。)

#3


10  

It is safe to use non-concurrent collections and non-atomic counters with parallel streams.

使用非并发集合和非原子计数器与并行流是安全的。

If you take a look at the documentation of Stream::collect, you find the following paragraph:

如果你看一下Stream :: collect的文档,你会发现以下段落:

Like reduce(Object, BinaryOperator), collect operations can be parallelized without requiring additional synchronization.

与reduce(Object,BinaryOperator)类似,可以并行化收集操作,而无需额外的同步。

And for the method Stream::reduce:

对于方法Stream :: reduce:

While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization and with greatly reduced risk of data races.

虽然与简单地在循环中改变运行总计相比,这似乎是更加迂回的执行聚合的方式,但是还原操作可以更优雅地并行化,而无需额外的同步并且大大降低了数据竞争的风险。

This might be a bit surprising. However, note that parallel streams are based on a fork-join model. That means the concurrent execution works as follows:

这可能有点令人惊讶。但请注意,并行流基于fork-join模型。这意味着并发执行的工作方式如下:

  • split sequence into two parts with about the same size
  • 将序列分成两个大小相同的部分

  • process each part individually
  • 单独处理每个部分

  • collect the results of both parts and combine them into one result
  • 收集两个部分的结果并将它们组合成一个结果

In the second step, the three steps are recursively applied to the sub-sequences.

在第二步中,将三个步骤递归地应用于子序列。

An example should make that clear. The

一个例子应该说清楚。该

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

The only purpose of the class Trace is log the constructor and method calls. If you execute this statement, it prints the following lines:

Trace类的唯一目的是记录构造函数和方法调用。如果执行此语句,则会打印以下行:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

You can see, that four Trace objects have been created, accumulate has been called once on each object, and combine has been used three times to combine the four objects into one. Each object can only be accesses by one thread at a time. That makes the code thread-safe, and the same applies to the method Collectors::toMap.

您可以看到,已创建了四个Trace对象,已在每个对象上调用一次accumulate,并且已使用三次组合将四个对象合并为一个。每个对象一次只能由一个线程访问。这使代码线程安全,同样适用于Collectors :: toMap方法。

#1


37  

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

非并发收集器可以安全地与并行流一起使用,还是在从并行流收集时只应使用并发版本?

It is safe to use a non-concurrent collector in a collect operation of a parallel stream.

在并行流的收集操作中使用非并发收集器是安全的。

In the specification of the Collector interface, in the section with half a dozen bullet points, is this:

在收集器界面的规范中,在带有六个子弹点的部分中,是:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

对于非并发收集器,从结果提供者,累加器或组合器函数返回的任何结果必须是串行线程限制的。这使得集合可以并行发生,而收集器不需要实现任何其他同步。减少实现必须管理输入被正确分区,分区是单独处理的,并且只有在累积完成后才进行组合。

This means that the various implementations provided by the Collectors class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

这意味着Collectors类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器。这也适用于您可能实现的任何非并发收集器。它们可以安全地与并行流一起使用,前提是您的收集器不会干扰流源,无副作用,顺序无关等。

I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList, which is not thread-safe.

我还建议阅读java.util.stream包文档中的Mutable Reduction部分。在本节的中间是一个声明可并行化的示例,但它将结果收集到ArrayList中,这不是线程安全的。

The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.

这种方式的工作方式是以非并发收集器结尾的并行流确保不同的线程始终在中间结果集合的不同实例上运行。这就是为什么收集器具有Supplier函数,用于创建与线程一样多的中间集合,因此每个线程可以累积到自己的集合中。当要合并中间结果时,它们在线程之间安全地切换,并且在任何给定时间只有一个线程合并任何一对中间结果。

#2


19  

All collectors, if they follow the rules in the specification, are safe to run in parallel or sequential. Parallel-readiness is a key part of the design here.

如果所有收集器遵循规范中的规则,则可以安全地并行或顺序运行。并行准备是这里设计的关键部分。

The distinction between concurrent and non-concurrent collectors have to do with the approach to parallelization.

并发和非并发收集器之间的区别与并行化方法有关。

An ordinary (non-concurrent) collector operates by merging sub-results. So the source is partitioned into a bunch of chunks, each chunk is collected into a result container (like a list or a map), and then the sub-results are merged into a bigger result container. This is safe and order-preserving, but for some kinds of containers -- especially maps -- can be expensive, since merging two maps by key is often expensive.

普通(非并发)收集器通过合并子结果来操作。因此,源被分成一堆块,每个块被收集到一个结果容器(如列表或映射)中,然后子结果被合并到一个更大的结果容器中。这是安全且保持订单的,但是对于某些类型的容器 - 尤其是地图 - 可能很昂贵,因为按键合并两个地图通常很昂贵。

A concurrent collector instead creates one result container, whose insertion operations are guaranteed to be thread-safe, and blasts elements into it from multiple threads. With a highly concurrent result container like ConcurrentHashMap, this approach may well perform better than merging ordinary HashMaps.

并发收集器会创建一个结果容器,其插入操作保证是线程安全的,并从多个线程中将元素压缩到其中。使用像ConcurrentHashMap这样的高度并发结果容器,这种方法可能比合并普通的HashMaps更好。

So, the concurrent collectors are strictly optimizations over their ordinary counterparts. And they don't come without a cost; because elements are being blasted in from many threads, concurrent collectors generally cannot preserve encounter order. (But, often you don't care -- when creating a word count histogram, you don't care which instance of "foo" you counted first.)

因此,并发收集器严格优于普通收集器。而且他们没有成本;因为元素是从许多线程中被抨击的,所以并发收集器通常不能保留遭遇顺序。 (但是,通常你不关心 - 在创建单词计数直方图时,你不关心你首先计算的“foo”实例。)

#3


10  

It is safe to use non-concurrent collections and non-atomic counters with parallel streams.

使用非并发集合和非原子计数器与并行流是安全的。

If you take a look at the documentation of Stream::collect, you find the following paragraph:

如果你看一下Stream :: collect的文档,你会发现以下段落:

Like reduce(Object, BinaryOperator), collect operations can be parallelized without requiring additional synchronization.

与reduce(Object,BinaryOperator)类似,可以并行化收集操作,而无需额外的同步。

And for the method Stream::reduce:

对于方法Stream :: reduce:

While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization and with greatly reduced risk of data races.

虽然与简单地在循环中改变运行总计相比,这似乎是更加迂回的执行聚合的方式,但是还原操作可以更优雅地并行化,而无需额外的同步并且大大降低了数据竞争的风险。

This might be a bit surprising. However, note that parallel streams are based on a fork-join model. That means the concurrent execution works as follows:

这可能有点令人惊讶。但请注意,并行流基于fork-join模型。这意味着并发执行的工作方式如下:

  • split sequence into two parts with about the same size
  • 将序列分成两个大小相同的部分

  • process each part individually
  • 单独处理每个部分

  • collect the results of both parts and combine them into one result
  • 收集两个部分的结果并将它们组合成一个结果

In the second step, the three steps are recursively applied to the sub-sequences.

在第二步中,将三个步骤递归地应用于子序列。

An example should make that clear. The

一个例子应该说清楚。该

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

The only purpose of the class Trace is log the constructor and method calls. If you execute this statement, it prints the following lines:

Trace类的唯一目的是记录构造函数和方法调用。如果执行此语句,则会打印以下行:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

You can see, that four Trace objects have been created, accumulate has been called once on each object, and combine has been used three times to combine the four objects into one. Each object can only be accesses by one thread at a time. That makes the code thread-safe, and the same applies to the method Collectors::toMap.

您可以看到,已创建了四个Trace对象,已在每个对象上调用一次accumulate,并且已使用三次组合将四个对象合并为一个。每个对象一次只能由一个线程访问。这使代码线程安全,同样适用于Collectors :: toMap方法。