Java 8并行流中的自定义线程池

时间:2022-05-19 13:51:41

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

假设我有一个服务器应用程序,我想使用并行流。但是应用程序很大,而且是多线程的,所以我想把它划分开来。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢。

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.

如果我不能为不同的模块使用不同的线程池,这就意味着在大多数实际情况下我不能安全地使用并行流。

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

尝试下面的例子。在单独的线程中执行一些CPU密集型任务。这些任务利用并行流。第一个任务被破坏,因此每个步骤花费1秒(通过线程睡眠进行模拟)。问题是其他线程会被卡住,等待被破坏的任务完成。这是一个设计好的示例,但是请想象一个servlet应用程序和某人向共享fork连接池提交一个长期运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

10 个解决方案

#1


283  

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

实际上,如何在特定的fork-join池中执行并行操作是有技巧的。如果您将其作为一个任务在一个fork-join池中执行,它将停留在那里并且不使用公共的。

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

这个技巧是基于ForkJoinTask。fork指定:“安排异步地执行当前任务在池中运行的任务,如果适用的话,或者使用ForkJoinPool.commonPool(),如果不是inForkJoinPool()”

#2


140  

The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams use all your processors because they also use the main thread):

并行流使用默认的ForkJoinPool.commonPool,默认情况下,拥有处理器的时候线程更少,这是由Runtime.getRuntime(). availableprocessor()返回的(这意味着并行流使用所有的处理器,因为它们也使用主线程):

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

对于需要独立或自定义池的应用程序,可以使用给定的目标并行级别构造一个ForkJoinPool;默认情况下,等于可用处理器的数量。

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

这也意味着如果您已经嵌套了并行流或同时启动了多个并行流,那么它们将共享相同的池。优点:您最多只能使用默认的(可用处理器的数量)。缺点:您可能不会获得分配给您发起的每个并行流的“所有处理器”(如果您恰好有多个并行流)。(显然,您可以使用ManagedBlocker来绕过这个问题。)

To change the way parallel streams are executed, you can either

要更改并行流执行的方式,您可以选择其中之一

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • 将并行流执行提交给您自己的ForkJoinPool: yourFJP.submit() -> streams .parallel().forEach(soSomething) .get();或
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads.
  • 您可以使用system属性:system . setproperty(“java.util. concurrentl . forkjoinpool .common.parallelism”、“20”)更改公共池的大小,以实现20个线程的目标并行性。

Example of the latter on my machine which has 8 processors. If I run the following program:

在我的机器上有8个处理器的例子。如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

的输出是:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

215 216 216 216 216 216 216 216 216 216 316 316 316 316 316 415 416 416 416 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

你可以看到并行流一次处理8个项目,也就是说它使用8个线程。但是,如果我取消注释,输出是:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.

这一次,并行流使用了20个线程,流中的所有20个元素都同时处理。

#3


31  

Alternatively to the trick of triggering the parallel computation inside your own forkJoinPool you can also pass that pool to the CompletableFuture.supplyAsync method like in:

除了在自己的forkJoinPool中触发并行计算的技巧之外,还可以将该池传递给CompletableFuture。supplyAsync方法如:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

#4


13  

Using a ForkJoinPool and submit for a parallel stream does not reliably use all threads. If you look at this ( Parallel stream from a HashSet doesn't run in parallel ) and this ( Why does the parallel stream not use all the threads of the ForkJoinPool? ), you'll see the reasoning.

使用一个ForkJoinPool和提交一个并行流并不可靠地使用所有线程。如果你看这个(HashSet中的并行流不并行运行)和这个(为什么并行流不使用ForkJoinPool的所有线程?)你会看到推理。

Short version: if ForkJoinPool/submit does not work for you, use

短版本:如果ForkJoinPool/submit不适合您,请使用

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

#5


6  

Until now, I used the solutions described in the answers of this question. Now, I came up with a little library called Parallel Stream Support for that:

到目前为止,我使用了问题答案中描述的解决方案。现在,我想到了一个叫做并行流支持的库

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

But as @PabloMatiasGomez pointed out in the comments, there are drawbacks regarding the splitting mechanism of parallel streams which depends heavily on the size of the common pool. See Parallel stream from a HashSet doesn't run in parallel .

但是正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在缺陷,这在很大程度上取决于公共池的大小。看到来自HashSet的并行流不是并行运行的。

I am using this solution only to have separate pools for different types of work but I can not set the size of the common pool to 1 even if I don't use it.

我使用这个解决方案只是为了为不同类型的工作提供单独的池,但是我不能将公共池的大小设置为1,即使我不使用它。

#6


5  

To measure the actual number of used threads, you can check Thread.activeCount():

要测量实际使用的线程数量,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

This can produce on a 4-core CPU an output like:

这可以在一个四核CPU上产生如下输出:

5 // common pool
23 // custom pool

Without .parallel() it gives:

没有.parallel()它给:

3 // common pool
4 // custom pool

#7


1  

Go to get AbacusUtil. Thread number can by specified for parallel stream. Here is the sample code:

去得到AbacusUtil。线程数可以指定为并行流。这是样本代码:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Disclosure: I'm the developer of AbacusUtil.

披露:我AbacusUtil的开发者。

#8


0  

If you don't mind using a third-party library, with cyclops-react you can mix sequential and parallel Streams within the same pipeline and provide custom ForkJoinPools. For example

如果您不介意使用第三方库,使用cyclop -react,您可以在同一管道中混合顺序流和并行流,并提供定制的fork - joinpools。例如

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Or if we wished to continue processing within a sequential Stream

或者如果我们希望在一个连续的流中继续处理

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Disclosure I am the lead developer of cyclops-react]

[披露我是cyclop -react的主要开发者]

#9


0  

I tried the custom ForkJoinPool as follows to adjust the pool size:

我尝试了定制的ForkJoinPool,如下调整池的大小:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Here is the output saying the pool is using more threads than the default 4.

这里是输出,表示池使用的线程比默认的4要多。

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

But actually there is a weirdo, when I tried to achieve the same result using ThreadPoolExecutor as follows:

但实际上有一个怪人,当我试图用ThreadPoolExecutor达到同样的结果时,如下所示:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

but I failed.

但是我失败了。

It will only start the parallelStream in a new thread and then everything else is just the same, which again proves that the parallelStream will use the ForkJoinPool to start its child threads.

它将只在一个新的线程中启动并行流,然后其他的都是相同的,这再次证明了并行流将使用ForkJoinPool启动它的子线程。

#10


0  

Note: There appears to be a fix implemented in JDK 10 that ensures the Custom Thread Pool uses the expected number of threads.

注意:在JDK 10中似乎有一个补丁,它确保自定义线程池使用预期的线程数。

Parallel stream execution within a custom ForkJoinPool should obey the parallelism https://bugs.openjdk.java.net/browse/JDK-8190974

在定制的ForkJoinPool中,并行流执行应该遵循并行性https://bugs.openjdk.java.net/browse/JDK-8190974

#1


283  

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

实际上,如何在特定的fork-join池中执行并行操作是有技巧的。如果您将其作为一个任务在一个fork-join池中执行,它将停留在那里并且不使用公共的。

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

这个技巧是基于ForkJoinTask。fork指定:“安排异步地执行当前任务在池中运行的任务,如果适用的话,或者使用ForkJoinPool.commonPool(),如果不是inForkJoinPool()”

#2


140  

The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams use all your processors because they also use the main thread):

并行流使用默认的ForkJoinPool.commonPool,默认情况下,拥有处理器的时候线程更少,这是由Runtime.getRuntime(). availableprocessor()返回的(这意味着并行流使用所有的处理器,因为它们也使用主线程):

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

对于需要独立或自定义池的应用程序,可以使用给定的目标并行级别构造一个ForkJoinPool;默认情况下,等于可用处理器的数量。

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

这也意味着如果您已经嵌套了并行流或同时启动了多个并行流,那么它们将共享相同的池。优点:您最多只能使用默认的(可用处理器的数量)。缺点:您可能不会获得分配给您发起的每个并行流的“所有处理器”(如果您恰好有多个并行流)。(显然,您可以使用ManagedBlocker来绕过这个问题。)

To change the way parallel streams are executed, you can either

要更改并行流执行的方式,您可以选择其中之一

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • 将并行流执行提交给您自己的ForkJoinPool: yourFJP.submit() -> streams .parallel().forEach(soSomething) .get();或
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads.
  • 您可以使用system属性:system . setproperty(“java.util. concurrentl . forkjoinpool .common.parallelism”、“20”)更改公共池的大小,以实现20个线程的目标并行性。

Example of the latter on my machine which has 8 processors. If I run the following program:

在我的机器上有8个处理器的例子。如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

的输出是:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

215 216 216 216 216 216 216 216 216 216 316 316 316 316 316 415 416 416 416 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

你可以看到并行流一次处理8个项目,也就是说它使用8个线程。但是,如果我取消注释,输出是:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.

这一次,并行流使用了20个线程,流中的所有20个元素都同时处理。

#3


31  

Alternatively to the trick of triggering the parallel computation inside your own forkJoinPool you can also pass that pool to the CompletableFuture.supplyAsync method like in:

除了在自己的forkJoinPool中触发并行计算的技巧之外,还可以将该池传递给CompletableFuture。supplyAsync方法如:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

#4


13  

Using a ForkJoinPool and submit for a parallel stream does not reliably use all threads. If you look at this ( Parallel stream from a HashSet doesn't run in parallel ) and this ( Why does the parallel stream not use all the threads of the ForkJoinPool? ), you'll see the reasoning.

使用一个ForkJoinPool和提交一个并行流并不可靠地使用所有线程。如果你看这个(HashSet中的并行流不并行运行)和这个(为什么并行流不使用ForkJoinPool的所有线程?)你会看到推理。

Short version: if ForkJoinPool/submit does not work for you, use

短版本:如果ForkJoinPool/submit不适合您,请使用

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

#5


6  

Until now, I used the solutions described in the answers of this question. Now, I came up with a little library called Parallel Stream Support for that:

到目前为止,我使用了问题答案中描述的解决方案。现在,我想到了一个叫做并行流支持的库

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

But as @PabloMatiasGomez pointed out in the comments, there are drawbacks regarding the splitting mechanism of parallel streams which depends heavily on the size of the common pool. See Parallel stream from a HashSet doesn't run in parallel .

但是正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在缺陷,这在很大程度上取决于公共池的大小。看到来自HashSet的并行流不是并行运行的。

I am using this solution only to have separate pools for different types of work but I can not set the size of the common pool to 1 even if I don't use it.

我使用这个解决方案只是为了为不同类型的工作提供单独的池,但是我不能将公共池的大小设置为1,即使我不使用它。

#6


5  

To measure the actual number of used threads, you can check Thread.activeCount():

要测量实际使用的线程数量,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

This can produce on a 4-core CPU an output like:

这可以在一个四核CPU上产生如下输出:

5 // common pool
23 // custom pool

Without .parallel() it gives:

没有.parallel()它给:

3 // common pool
4 // custom pool

#7


1  

Go to get AbacusUtil. Thread number can by specified for parallel stream. Here is the sample code:

去得到AbacusUtil。线程数可以指定为并行流。这是样本代码:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Disclosure: I'm the developer of AbacusUtil.

披露:我AbacusUtil的开发者。

#8


0  

If you don't mind using a third-party library, with cyclops-react you can mix sequential and parallel Streams within the same pipeline and provide custom ForkJoinPools. For example

如果您不介意使用第三方库,使用cyclop -react,您可以在同一管道中混合顺序流和并行流,并提供定制的fork - joinpools。例如

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Or if we wished to continue processing within a sequential Stream

或者如果我们希望在一个连续的流中继续处理

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Disclosure I am the lead developer of cyclops-react]

[披露我是cyclop -react的主要开发者]

#9


0  

I tried the custom ForkJoinPool as follows to adjust the pool size:

我尝试了定制的ForkJoinPool,如下调整池的大小:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Here is the output saying the pool is using more threads than the default 4.

这里是输出,表示池使用的线程比默认的4要多。

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

But actually there is a weirdo, when I tried to achieve the same result using ThreadPoolExecutor as follows:

但实际上有一个怪人,当我试图用ThreadPoolExecutor达到同样的结果时,如下所示:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

but I failed.

但是我失败了。

It will only start the parallelStream in a new thread and then everything else is just the same, which again proves that the parallelStream will use the ForkJoinPool to start its child threads.

它将只在一个新的线程中启动并行流,然后其他的都是相同的,这再次证明了并行流将使用ForkJoinPool启动它的子线程。

#10


0  

Note: There appears to be a fix implemented in JDK 10 that ensures the Custom Thread Pool uses the expected number of threads.

注意:在JDK 10中似乎有一个补丁,它确保自定义线程池使用预期的线程数。

Parallel stream execution within a custom ForkJoinPool should obey the parallelism https://bugs.openjdk.java.net/browse/JDK-8190974

在定制的ForkJoinPool中,并行流执行应该遵循并行性https://bugs.openjdk.java.net/browse/JDK-8190974