在开发中我们有时会遇到这种需求,要求主线程要等待多个子线程的任务执行完成后才能继续运行。比如我要整合多个渠道商品的价格,每个渠道取多条数据,取出数据后将多个渠道的商品按照价格升序排列。我们可以一个一个渠道拿数据,也可以开启多个线程同时取数据。总之,最后返回的数据是各个渠道数据的汇总。
对于上面的这种需求,如果顺序取数据是不可取的,如果每个渠道的请求耗时是500ms,那么总耗时将会是n个500ms;如果采用多线程获取,理论上的耗时可能就是500ms。但使用多线程,我们就需要在主线程做好控制,避免子线程还没有执行完,主线程就已经返回数据了。下面通过代码演示几种让主线程等待的方式:
(1)CountDownLatch
使用CountDownLatch可以非常方便的实现线程等待,它是一种类似倒计时的功能,当计数为0时程序继续向下运行,初始化的时候需要指定计数数量,简单使用如下:
import java.time.LocalDateTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
int nums = 10;
CountDownLatch latch = new CountDownLatch(nums);
for(int i = 0; i < nums; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
System.out.println(Thread.currentThread().getName() + " run await : " + LocalDateTime.now());
}
}).start();
}
// 这里会阻塞程序执行,直到计数为0
latch.await();
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
}
}
(2)ExecutorService
ExecutorService是通过线程池方式实现程序等待的,它的原理是进入方法时初始化一个线程池,添加任务执行,然后执行关闭线程池,线程池会在所有线程任务执行完成后进行关闭,通过判断线程池是否关闭来判断程序的执行。
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceTest {
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(5);
for(int i = 0; i < 10; i ++) {
service.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
service.shutdown();
// 通过循环判断线程池是否已经结束
// while (!()) {
// (5);
// }
// 通过阻塞等待线程池结束
service.awaitTermination(20, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
}
}
(3)CompletableFuture
使用java8中的异步编程中的allOf()方法,可以等待指定的全部future执行完成后在继续执行主线程的方法:
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class FutureTest {
public static void main(String[] args) throws Exception {
int nums = 10;
CompletableFuture[] futures = new CompletableFuture[10];
for(int i = 0; i < nums; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures);
future.get();
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
}
}
(4)Thread的join()方法
join()方法会让主线程等待线程执行完成后再继续执行:
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class JoinTest {
public static void main(String[] args) {
int nums = 10;
Thread[] threads = new Thread[nums];
for(int i = 0; i < nums; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
threads[i] = thread;
}
Arrays.stream(threads).forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
}
}
(5)CyclicBarrier
CyclicBarrier与CountDownLatch类似,但CyclicBarrier可重置。CyclicBarrier是所有线程执行完后一起等待,条件满足后所有线程再一起继续向下执行;而CountDownLatch是将计数减1,减数后程序可以继续执行,不用一起等待结束后再执行。
import java.time.LocalDateTime;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierTest {
public static void main(String[] args) {
int nums = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(nums + 1);
for(int i = 0; i < nums; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " run end : " + LocalDateTime.now());
}
}
个人感觉使用CountDownLatch和CompletableFuture这两种方式比较简单,并且可以使用线程池来完成任务,线程重复使用减少系统开销。CyclicBarrier需要阻塞线程,ExecutorService每次都需要单独建立线程池执行任务,join()也是每个任务都需要单独建立线程执行,系统开销上会比较大。