无论是项目开发还是开源代码阅读,多线程都是不可或缺的一个重要知识点,基于这个考量,于是总结出本篇文章,讨论闭锁(CountDownLatch)、栅栏(CyclicBarrier)与异步编排(CompletableFuture)
@Author:Akai-yuan
@更新时间:2023/2/4
1.适用场景
- 协调子线程结束动作:等待所有子线程运行结束
主线程创建了5个子线程,各子任务执行确认动作,期间主线程进入等待状态,直到各子线程的任务均已经完成,主线程恢复继续执行。
- 协调子线程开始动作:统一各线程动作开始的时机
从多线程的角度看,这恰似你创建了一些多线程,但是你需要统一管理它们的任务开始时间。
2.设计思想
CountDownLatch基于一个同步器实现,并且只有CountDownLatch(int count)一个构造器,指定数量count不得在中途修改它。
核心函数
- await():等待latch降为0;
- boolean await(long timeout, TimeUnit unit):等待latch降为0,但是可以设置超时时间。
- countDown():latch数量减1;
- getCount():获取当前的latch数量。
3.场景实例
场景1. 对各子线程的等待
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(4);
Thread t1 = new Thread(countDownLatch::countDown);
Thread t2 = new Thread(countDownLatch::countDown);
Thread t3 = new Thread(countDownLatch::countDown);
Thread t4 = new Thread(() -> {
try {
// 稍等...
Thread.sleep(1500);
countDownLatch.countDown();
} catch (InterruptedException ignored) {}
});
t1.start();
t2.start();
t3.start();
t4.start();
//直到所有线程都对计数器进行减一后,这里才放行
countDownLatch.await();
System.out.println("所有子线程就位,可以继续执行其他任务");
}
场景2. 对多线程的统一管理
我们仍然用4个线程调用了start(),但是它们在运行时都在等待countDownLatch的信号,在信号未收到前,它们不会往下执行。
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Thread t1 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t2 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t3 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t4 = new Thread(() -> waitForCountDown(countDownLatch));
t1.start();
t2.start();
t3.start();
t4.start();
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("所有线程准备完成");
}
private static void waitForCountDown(CountDownLatch countDownLatch) {
try {
// 等待信号
countDownLatch.await();
System.out.println("本线程等待完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出:
所有线程准备完成
本线程等待完毕
本线程等待完毕
本线程等待完毕
本线程等待完毕
Process finished with exit code 0
场景3. SOFAJRaft的实践
// 定义一个CountDownLatch计数器
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
public void start() {
switch (workerStateUpdater.get(this)) {
case WORKER_STATE_INIT:
if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//此处调用工作线程执行CountDownLatch的countDown()方法
//即startTimeInitialized.countDown();
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待startTime被工作线程初始化完成
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
2.CyclicBarrier
1.适用场景
栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier与CountDownLatch的区别
CyclicBarrier | CountDownLatch |
---|---|
CyclicBarrier是可重用的,其中的线程会等待所有的线程完成任务。届时,屏障将被拆除,并可以选择性地做一些特定的动作。 | CountDownLatch是一次性的,不同的线程在同一个计数器上工作,直到计数器为0 |
CyclicBarrier面向的是线程数 | CountDownLatch面向的是任务数 |
在使用CyclicBarrier时,你必须在构造中指定参与协作的线程数,这些线程必须调用await()方法 | 使用CountDownLatch时,则必须要指定任务数,至于这些任务由哪些线程完成无关紧要 |
CyclicBarrier可以在所有的线程释放后重新使用 | CountDownLatch在计数器为0时不能再使用 |
在CyclicBarrier中,如果某个线程遇到了中断、超时等问题时,则处于await的线程都会出现问题 | 在CountDownLatch中,如果某个线程出现问题,其他线程不受影响 |
2.设计思想
1.构造器
// 指定参与方的数量;
public CyclicBarrier(int parties);
// 指定参与方的数量,并指定在本代次结束时运行的代码
public CyclicBarrier(int parties, Runnable barrierAction):
2.核心方法
//如果当前线程不是第一个到达屏障的话,它将会进入等待,直到其他线程都到达
//除非发生被中断、屏障被拆除、屏障被重设等情况
public int await();
//和await()类似,但是加上了时间限制;
public int await(long timeout, TimeUnit unit);
//当前屏障是否被拆除;
public boolean isBroken();
//重设当前屏障。会先拆除屏障再设置新的屏障
public void reset();
//正在等待的线程数量
public int getNumberWaiting();
3.场景实例
下面以一个简单的日常对话来讲解CyclicBarrier的使用实例
private static String appointmentPlace = "书房";
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地点:" + appointmentPlace));
// 线程Akai
Thread Akai = newThread("Akai", () -> {
System.out.println("yuan,饭好了快来吃饭...");
try {
// 此时Akai在屏障前等待
cyclicBarrier.await();
System.out.println("yuan,你来了...");
// 开始吃饭...
Thread.sleep(2600);
System.out.println("好的,你去洗你的碗吧!");
// 第二次调用await
cyclicBarrier.await();
Thread.sleep(100);
System.out.println("好吧,你这个懒猪!");
} catch (Exception e) {
e.printStackTrace();
}
});
// 线程yuan
Thread yuan = newThread("yuan", () -> {
try {
// yuan在敲代码
Thread.sleep(500);
System.out.println("我在敲代码,我马上就来!");
// yuan到达饭桌前
cyclicBarrier.await();
Thread.sleep(500);
System.out.println("Akai,不好意思,刚刚沉迷于敲代码了!");
// 开始吃饭...
Thread.sleep(1500);
// yuan想先吃完赶快洗碗然后溜出去敲代码
System.out.println("我吃完了,我要去洗碗了");
// yuan把地点改成了厨房
appointmentPlace = "厨房";
// 洗碗中...
Thread.sleep(1500);
System.out.println("︎yuan终于洗完自己的碗了");
// 第二次调用await
cyclicBarrier.await();
System.out.println("Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了");
} catch (Exception ignored) {}
});
Akai.start();
yuan.start();
}
输出结果:
yuan,饭好了快来吃饭...
我在敲代码,我马上就来!
yuan所在的地点:书房
yuan,你来了...
Akai,不好意思,刚刚沉迷于敲代码了!
我吃完了,我要去洗碗了
好的,你去洗你的碗吧!
yuan终于洗完自己的碗了
yuan所在的地点:厨房
Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了
好吧,你这个懒猪!
3.CompletableFuture
1.设计思想
1.Future的局限性
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
2.Completable有哪些优势
CompletableFuture是Future接口的扩展和增强。
CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。
从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
2.核心设计
我们首先来讨论CompletableFuture的核心:CompletionStage
顾名思义,根据CompletionStage名字中的"Stage",你可以把它理解为任务编排中的步骤。步骤,即任务编排的基本单元,它可以是一次纯粹的计算或者是一个特定的动作。在一次编排中,会包含多个步骤,这些步骤之间会存在依赖、链式和组合等不同的关系,也存在并行和串行的关系。这种关系,类似于Pipeline或者流式计算。
既然是编排,就需要维护任务的创建、建立计算关系。为此,CompletableFuture提供了多达50多个方法,但没有必要全部完全理解,但我们可以通过分类的方式简化对方法的理解,理解了类型和变种,基本上我们也就掌握了CompletableFuture的核心能力。
这些方法可以总结为以下四类,其他大部分方法都是基于这四种类型的变种:
3.核心用法
1.runAsync
- runAsync()是CompletableFuture最常用的方法之一,它可以接收一个待运行的任务并返回一个CompletableFuture
- 当我们想异步运行某个任务时,在以往需要手动实现Thread或者借助Executor实现。而通过runAsync()`就简单多了。比如,我们可以直接传入Runnable类型的任务:
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("something");
}
});
2.supply与supplyAsync
- 所谓supply表示提供结果,换句话说当我们使用supply()时,就表明我们会返回一个结果,并且这个结果可以被后续的任务所使用。
// 创建nameFuture,返回姓名
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
// 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "love you," + name;
});
//阻塞获得表白的结果
System.out.println(sayLoveFuture.get()); // love you,Akai-yuan
一旦理解了supply()的含义,它也就如此简单。如果你希望用新的线程运行任务,可以使用supplyAsync().
3.thenApply与thenApplyAsync
- 我们已经知道supply()是用于提供结果的,并且顺带提了thenApply()。很明显,thenApply()是supply()的搭档,用于接收supply()的执行结果,并执行特定的代码逻辑,最后返回CompletableFuture结果。
// 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "爱你," + name;
});
public <U> CompletableFuture <U> thenApplyAsync(
Function <? super T, ? extends U> fn) {
return uniApplyStage(null, fn);
}
4.thenAccept与thenAcceptAsync
作为supply()的档案,thenApply()并不是唯一的存在,thenAccept()也是。但与thenApply()不同,thenAccept()只接收数据,但不会返回,它的返回类型是Void.
CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
System.out.println("爱你," + name);
});
public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
return uniAcceptStage(null, action);
}
5.thenRun
thenRun()就比较简单了,不接收任务的结果,只运行特定的任务,并且也不返回结果。
public CompletableFuture < Void > thenRun(Runnable action) {
return uniRunStage(null, action);
}
所以,如果你在回调中不想返回任何的结果,只运行特定的逻辑,那么你可以考虑使用thenAccept和thenRun一般来说,这两个方法会在调用链的最后面使用。
6.thenCompose与 thenCombine
以上几种方法都是各玩各的,但thenCompose()与thenCombine()就不同了,它们可以实现对依赖和非依赖两种类型的任务的编排。
编排两个存在依赖关系的任务
在前面的例子中,在接收前面任务的结果时,我们使用的是thenApply(). 也就是说,sayLoveFuture在执行时必须依赖nameFuture的完成,否则执行个锤子。
// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
// 使用thenApply()接收nameFuture的结果,并执行回调动作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "爱你," + name;
});
但其实,除了thenApply()之外,我们还可以使用thenCompose()来编排两个存在依赖关系的任务。比如,上面的示例代码可以写成:
// 创建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
return CompletableFuture.supplyAsync(() -> "爱你," + name);
});
可以看到,thenCompose()和thenApply()的核心不同之处在于它们的返回值类型:
- thenApply():返回计算结果的原始类型,比如返回String;
- thenCompose():返回CompletableFuture类型,比如返回CompletableFuture.
组合两个相互独立的任务
考虑一个场景,当我们在执行某个任务时,需要其他任务就绪才可以,应该怎么做?这样的场景并不少见,我们可以使用前面学过的并发工具类实现,也可以使用thenCombine()实现。
举个例子,当我们计算某个胜率时,我们需要获取她参与的总场次(rounds),以及获胜的场次(winRounds),然后再通过winRounds / rounds来计算。对于这个计算,我们可以这么做:
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);
CompletableFuture < Object > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
return 0.0;
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
});
System.out.println(winRateFuture.get());
thenCombine()将另外两个任务的结果同时作为参数,参与到自己的计算逻辑中。在另外两个参数未就绪时,它将会处于等待状态。
7.allOf与anyOf
allOf()与anyOf()也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:
- allOf():给定一组任务,等待所有任务执行结束;
- anyOf():给定一组任务,等待其中任一任务执行结束。
allOf()与anyOf()的方法签名如下:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
需要注意的是,anyOf()将返回完任务的执行结果,但是allOf()不会返回任何结果,它的返回值是Void.
allOf()与anyOf()的示例代码如下所示。我们创建了roundsFuture和winRoundsFuture,并通过sleep模拟它们的执行时间。在执行时,winRoundsFuture将会先返回结果,所以当我们调用 CompletableFuture.anyOf时也会发现输出的是365.
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return 365;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
System.out.println(completedFuture.get()); // 返回365
CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);
在CompletableFuture之前,如果要实现所有任务结束后执行特定的动作,我们可以考虑CountDownLatch等工具类。现在,则多了一选项,我们也可以考虑使用CompletableFuture.allOf.
8.异常处理
在CompletableFuture链式调用中,如果某个任务发生了异常,那么后续的任务将都不会再执行。对于异常,我们有两种处理方式:exceptionally()和handle().
1.使用exceptionally()回调处理异常
在链式调用的尾部使用exceptionally(),捕获异常并返回错误情况下的默认值。需要注意的是,exceptionally()仅在发生异常时才会调用。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("总场次错误");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).exceptionally(ex -> {
System.out.println("出错:" + ex.getMessage());
return "";
});
System.out.println(winRateFuture.get());
2. 使用handle()处理异常
除了exceptionally(),CompletableFuture也提供了handle()来处理异常。不过,与exceptionally()不同的是,当我们在调用链中使用了handle(),那么无论是否发生异常,都会调用它。所以,在handle()方法的内部,我们需要通过 if (ex != null) 来判断是否发生了异常。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("总场次错误");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).handle((res, ex) -> {
if (ex != null) {
System.out.println("出错:" + ex.getMessage());
return "";
}
return res;
});
System.out.println(winRateFuture.get());
当然,如果我们允许某个任务发生异常而不中断整个调用链路,那么可以在其内部通过try-catch消化掉。