Java中的parallelStream

时间:2025-02-13 08:03:18

一、概述

Java 中的 parallelStream 的底层实现为 ForkJoinPool 。
线程池是所有并行流共享的。
线程池的线程数量和CPU核数一致。
需要等待任务全部执行完毕,主线程(调用线程)才继续往下执行。

注意:因为线程池是全局共享的,所以我们尽量不要在 parallelStream 中执行阻塞任务,否则会影响到整个系统中其他的 parallelStream 任务执行的。

二、案例

下面的案例中使用了2个新线程,在其中分别执行 parallelStream 。
线程A启动后,我们暂停2秒,目的是为了让线程A中的任务占满 ForkJoinPool 中的线程,并且每个任务执行后,要暂停10秒,目的是为了方便我们观察 ForkJoinPool 中总共有多少个线程。

    public static void main(String[] args) {
        try {
            // 构建list数据
            List list = new ArrayList<>(20);
            for (int i = 0; i < 20; i++) {
                list.add(i);
            }

            // 启动第一个线程,在里面执行
            Thread t_A = new Thread(() -> {
                list.parallelStream().forEach((i) -> {
                    System.out.println(i + "=====A=====" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            });
            t_A.start();

            // 暂停2秒
            Thread.sleep(2000);

            //执行第二个线程
            Thread t_B = new Thread(() -> {
                list.parallelStream().forEach((i) -> {
                    System.out.println(i + "=====B=====" + Thread.currentThread().getName());
                });
            });
            t_B.start();

            t_A.join();
            t_B.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

执行结果如下:
注意查看,首先线程A中的任务执行了12条(包括调用线程自身),我使用的电脑CPU为12核心。
2秒钟后,线程B开始执行,此时线程A中的12个线程还处于阻塞状态,线程B中的任务,全部由调用线程执行。

12=====A=====Thread-0
6=====A=====ForkJoinPool.commonPool-worker-9
7=====A=====ForkJoinPool.commonPool-worker-6
2=====A=====ForkJoinPool.commonPool-worker-2
8=====A=====ForkJoinPool.commonPool-worker-11
4=====A=====ForkJoinPool.commonPool-worker-4
5=====A=====ForkJoinPool.commonPool-worker-15
1=====A=====ForkJoinPool.commonPool-worker-13
3=====A=====ForkJoinPool.commonPool-worker-8
9=====A=====ForkJoinPool.commonPool-worker-1
0=====A=====ForkJoinPool.commonPool-worker-10
17=====A=====ForkJoinPool.commonPool-worker-3
12=====B=====Thread-1
14=====B=====Thread-1
13=====B=====Thread-1
11=====B=====Thread-1
10=====B=====Thread-1
17=====B=====Thread-1
19=====B=====Thread-1
18=====B=====Thread-1
16=====B=====Thread-1
15=====B=====Thread-1
6=====B=====Thread-1
5=====B=====Thread-1
8=====B=====Thread-1
9=====B=====Thread-1
7=====B=====Thread-1
2=====B=====Thread-1
4=====B=====Thread-1
3=====B=====Thread-1
1=====B=====Thread-1
0=====B=====Thread-1
13=====A=====ForkJoinPool.commonPool-worker-11
18=====A=====ForkJoinPool.commonPool-worker-3
15=====A=====ForkJoinPool.commonPool-worker-9
10=====A=====ForkJoinPool.commonPool-worker-4
19=====A=====ForkJoinPool.commonPool-worker-10
16=====A=====ForkJoinPool.commonPool-worker-1
11=====A=====ForkJoinPool.commonPool-worker-13
14=====A=====ForkJoinPool.commonPool-worker-8