多线程系列(十五) -常用并发工具类详解

时间:2024-03-07 13:45:38

一、摘要

在前几篇文章中,我们讲到了线程、线程池、BlockingQueue 等核心组件,其实 JDK 给开发者还提供了比synchronized更加高级的线程同步组件,比如 CountDownLatch、CyclicBarrier、Semaphore、Exchanger 等并发工具类。

下面我们一起来了解一下这些常用的并发工具类!

二、常用并发工具类

2.1、CountDownLatch

CountDownLatch是 JDK5 之后加入的一种并发流程控制工具类,它允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

它的工作原理主要是通过一个计数器来实现,初始化的时候需要指定线程的数量;每当一个线程完成了自己的任务,计数器的值就相应得减 1;当计数器到达 0 时,表示所有的线程都已经执行完毕,处于等待的线程就可以恢复继续执行任务。

根据CountDownLatch的工作原理,它的应用场景一般可以划分为两种:

  • 场景一:某个线程需要在其他 n 个线程执行完毕后,再继续执行
  • 场景二:多个工作线程等待某个线程的命令,同时执行同一个任务

下面我们先来看下两个简单的示例。

示例1:某个线程等待 n 个工作线程

比如某项任务,先采用多线程去执行,最后需要在主线程中进行汇总处理,这个时候CountDownLatch就可以发挥作用了,具体应用如下!

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 采用 10 个工作线程去执行任务
        final int threadCount = 10;
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 执行具体任务
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",执行完毕!");
                    // 计数器减 1
                    countDownLatch.countDown();
                }
            }).start();
        }

        // 阻塞等待 10 个工作线程执行完毕
        countDownLatch.await();
        System.out.println("所有任务线程已执行完毕,准备进行结果汇总");
    }
}

运行结果如下:

thread name:Thread-0,执行完毕!
thread name:Thread-2,执行完毕!
thread name:Thread-1,执行完毕!
thread name:Thread-3,执行完毕!
thread name:Thread-4,执行完毕!
thread name:Thread-5,执行完毕!
thread name:Thread-6,执行完毕!
thread name:Thread-7,执行完毕!
thread name:Thread-8,执行完毕!
thread name:Thread-9,执行完毕!
所有任务线程执行完毕,准备进行结果汇总
示例2:n 个工作线程等待某个线程

比如田径赛跑,10 个同学准备开跑,但是需要等工作人员发出枪声才允许开跑,使用CountDownLatch可以实现这一功能,具体应用如下!

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 使用一个计数器
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final int threadCount = 10;
        // 采用 10 个工作线程去执行任务
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 阻塞等待计数器为 0
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 发起某个服务请求,省略
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",开始执行!");

                }
            }).start();
        }

        Thread.sleep(1000);
        System.out.println("thread name:" +  Thread.currentThread().getName() + " 准备开始!");
        // 将计数器减 1,运行完成后为 0
        countDownLatch.countDown();
    }
}

运行结果如下:

thread name:main 准备开始!
thread name:Thread-0,开始执行!
thread name:Thread-1,开始执行!
thread name:Thread-2,开始执行!
thread name:Thread-3,开始执行!
thread name:Thread-5,开始执行!
thread name:Thread-6,开始执行!
thread name:Thread-8,开始执行!
thread name:Thread-7,开始执行!
thread name:Thread-4,开始执行!
thread name:Thread-9,开始执行!

从上面的示例可以很清晰的看到,CountDownLatch类似于一个倒计数器,当计数器为 0 的时候,调用await()方法的线程会被解除等待状态,然后继续执行。

CountDownLatch类的主要方法,有以下几个:

  • public CountDownLatch(int count):核心构造方法,初始化的时候需要指定线程数
  • countDown():每调用一次,计数器值 -1,直到 count 被减为 0,表示所有线程全部执行完毕
  • await():等待计数器变为 0,即等待所有异步线程执行完毕,否则一直阻塞
  • await(long timeout, TimeUnit unit):支持指定时间内的等待,避免永久阻塞,await()的一个重载方法

从以上的分析可以得出,当计数器为 1 的时候,即由一个线程来通知其他线程,效果等同于对象的wait()notifyAll();当计时器大于 1 的时候,可以实现多个工作线程完成任务后通知一个或者多个等待线程继续工作,CountDownLatch可以看成是一种进阶版的等待/通知机制,在实际中应用比较多见。

2.2、CyclicBarrier

CyclicBarrier从字面上很容易理解,表示可循环使用的屏障,它真正的作用是让一组线程到达一个屏障时被阻塞,直到满足要求的线程数都到达屏障时,屏障才会解除,此时所有被屏障阻塞的线程就可以继续执行。

下面我们还是先看一个简单的示例,以便于更好的理解这个工具类。

public class CyclicBarrierTest {

    public static void main(String[] args) {
        // 设定参与线程的个数为 5
        int threadCount = 5;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有的线程都已经准备就绪...");
            }
        });
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",已达到屏障!");
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("thread name:" +  Thread.currentThread().getName() + ",阻塞解除,继续执行!");
                }
            }).start();
        }
    }
}

输出结果:

thread name:Thread-0,已达到屏障!
thread name:Thread-1,已达到屏障!
thread name:Thread-2,已达到屏障!
thread name:Thread-3,已达到屏障!
thread name:Thread-4,已达到屏障!
所有的线程都已经准备就绪...
thread name:Thread-4,阻塞解除,继续执行!
thread name:Thread-0,阻塞解除,继续执行!
thread name:Thread-3,阻塞解除,继续执行!
thread name:Thread-1,阻塞解除,继续执行!
thread name:Thread-2,阻塞解除,继续执行!

从上面的示例可以很清晰的看到,CyclicBarrier中设定的线程数相当于一个屏障,当所有的线程数达到时,此时屏障就会解除,线程继续执行剩下的逻辑。

CyclicBarrier类的主要方法,有以下几个:

  • public CyclicBarrier(int parties):构造方法,parties参数表示参与线程的个数
  • public CyclicBarrier(int parties, Runnable barrierAction):核心构造方法,barrierAction参数表示线程到达屏障时的回调方法
  • public void await():核心方法,每个线程调用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,直到屏障解除,继续执行剩下的逻辑

从以上的示例中,可以看到CyclicBarrierCountDownLatch有很多的相似之处,都能够实现线程之间的等待,但是它们的侧重点不同:

  • CountDownLatch一般用于一个或多个线程,等待其他的线程执行完任务后再执行
  • CyclicBarrier一般用于一组线程等待至某个状态,当状态解除之后,这一组线程再继续执行
  • CyclicBarrier中的计数器可以反复使用,而CountDownLatch用完之后只能重新初始化

2.3、Semaphore

Semaphore通常我们把它称之为信号计数器,它可以保证同一时刻最多有 N 个线程能访问某个资源,比如同一时刻最多允许 10 个用户访问某个服务,同一时刻最多创建 100 个数据库连接等等。

Semaphore可以用于控制并发的线程数,实际应用场景非常的广,比如流量控制、服务限流等等。

下面我们看一个简单的示例。

public class SemaphoreTest {

    public static void main(String[] args) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 同一时刻仅允许最多3个线程获取许可
        final Semaphore semaphore = new Semaphore(3);
        // 初始化 5 个线程生成
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 如果超过了许可数量,其他线程将在此等待
                        semaphore.acquire();
                        System.out.println(format.format(new Date()) +  " thread name:" +  Thread.currentThread().getName() + " 获取许可,开始执行任务");
                        // 假设执行某项任务的耗时
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 使用完后释放许可
                        semaphore.release();
                    }
                }
            }).start();
        }
    }
}

输出结果:

2023-11-22 17:32:01 thread name:Thread-0 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-1 获取许可,开始执行任务
2023-11-22 17:32:01 thread name:Thread-2 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-4 获取许可,开始执行任务
2023-11-22 17:32:03 thread name:Thread-3 获取许可,开始执行任务

从上面的示例可以很清晰的看到,同一时刻前 3 个线程获得了许可优先执行, 2 秒过后许可被释放,剩下的 2 个线程获取释放的许可继续执行。

Semaphore类的主要方法,有以下几个:

  • public Semaphore(int permits):构造方法,permits参数表示同一时间能访问某个资源的线程数量
  • acquire():获取一个许可,在获取到许可之前或者被其他线程调用中断之前,线程将一直处于阻塞状态
  • tryAcquire(long timeout, TimeUnit unit):表示在指定时间内尝试获取一个许可,如果获取成功,返回true;反之false
  • release():释放一个许可,同时唤醒一个获取许可不成功的阻塞线程。

通过permits参数的设定,可以实现限制多个线程同时访问服务的效果,当permits参数为 1 的时候,表示同一时刻只有一个线程能访问服务,相当于一个互斥锁,效果等同于synchronized

使用Semaphore的时候,通常需要先调用acquire()或者tryAcquire()获取许可,然后通过try ... finally模块在finally中释放许可。

例如如下方式,尝试在 3 秒内获取许可,如果没有获取就退出,防止程序一直阻塞。

// 尝试 3 秒内获取许可
if(semaphore.tryAcquire(3, TimeUnit.SECONDS)){
    try {
       // ...业务逻辑
    }  finally {
        // 释放许可
        semaphore.release();
    }
}

2.4、Exchanger

Exchanger从字面上很容易理解表示交换,它主要用途在两个线程之间进行数据交换,注意也只能在两个线程之间进行数据交换。

Exchanger提供了一个exchange()同步交换方法,当两个线程调用exchange()方法时,无论调用时间先后,会互相等待线程到达exchange()方法同步点,此时两个线程进行交换数据,将本线程产出数据传递给对方。

简单的示例如下。

public class ExchangerTest {

    public static void main(String[] args) {
        // 交换同步器
        Exchanger<String> exchanger = new Exchanger<>();

        // 线程1
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String value = "A";
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 原数据:" + value);
                    String newValue = exchanger.exchange(value);
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 线程2
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String value = "B";
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 原数据:" + value);
                    String newValue = exchanger.exchange(value);
                    System.out.println("thread name:" +  Thread.currentThread().getName() + " 交换后的数据:" + newValue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出结果:

thread name:Thread-0 原数据:A
thread name:Thread-1 原数据:B
thread name:Thread-0 交换后的数据:B
thread name:Thread-1 交换后的数据:A

从上面的示例可以很清晰的看到,当线程Thread-0Thread-1都到达了exchange()方法的同步点时,进行了数据交换。

Exchanger类的主要方法,有以下几个:

  • exchange(V x):等待另一个线程到达此交换点,然后将给定的对象传送给该线程,并接收该线程的对象,除非当前线程被中断,否则一直阻塞等待
  • exchange(V x, long timeout, TimeUnit unit):表示在指定的时间内等待另一个线程到达此交换点,如果超时会自动退出并抛超时异常

如果多个线程调用exchange()方法,数据交换可能会出现混乱,因此实际上Exchanger应用并不多见。

三、小结

本文主要围绕 Java 多线程中常见的并发工具类进行了简单的用例介绍,这些工具类都可以实现线程同步的效果,底层原理实现主要是基于 AQS 队列式同步器来实现,关于 AQS 我们会在后期的文章中再次介绍。

本文篇幅稍有所长,内容难免有所遗漏,欢迎大家留言指出!

四、参考

1.https://www.cnblogs.com/xrq730/p/4869671.html

2.https://zhuanlan.zhihu.com/p/97055716