1.CountDownLatch -- 锁存器
有时在线程开发中遇到一些问题,如主线程启动了多个子线程,主线程需要在子线程都结束后再做一些处理,也就是说,主线程必须知道所有子线程都结束的时候。刚开始的时候自己写一个子线程列表,启动一个子线程,加1,结束一个子线程,减1,主线程不断循环等待,当子线程列表归零时就说明所有子线程都结束了。简单的任务还可以勉强使用,但大量是用线程池的时候,发现不靠谱了,研究发现,原来jdk中已经有了该工具类--CountDownLatch。
jdk文档:
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数初始化CountDownLatch
。由于调用了countDown()
方法,所以在当前计数到达零之前,await
方法会一直受阻塞。之后,会释放所有等待的线程,await
的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用CyclicBarrier
。
CountDownLatch
是一个通用同步工具,它有很多用途。将计数 1 初始化的CountDownLatch
用作一个简单的开/关锁存器,或入口:在通过调用countDown()
的线程打开入口前,所有调用await
的线程都一直在入口处等待。用N 初始化的 CountDownLatch
可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
CountDownLatch
的一个有用特性是,它不要求调用 countDown
方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个await
。
构造方法摘要 | |
---|---|
CountDownLatch(int count) 构造一个用给定计数初始化的CountDownLatch 。 |
方法摘要 | |
---|---|
|
await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。 |
|
await(long timeout,TimeUnit unit) 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。 |
countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 |
|
getCount() 返回当前计数。 |
|
|
toString() 返回标识此锁存器及其状态的字符串。 |
实例:
public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); //三个个工人的协作
Worker worker1 = new Worker("张三", 4000, latch);
Worker worker2 = new Worker("李四", 2000, latch);
Worker worker3 = new Worker("王五", 5000, latch);
worker1.start();
worker2.start();
worker3.start();
// 主线程阻塞,等待所有子线程完成(调用latch.countDown())
latch.await();
System.out.println("主线程:工作完成");
}
}
/**
* 工人类-子线程
*/
class Worker extends Thread{
private String name; // 工人姓名
private long time; // 工作时间(单位:毫秒)
private CountDownLatch latch; // 计数锁存器
public Worker(String name, long time, CountDownLatch latch) {
this.name = name;
this.time = time;
this.latch = latch;
}
private void doWork() throws InterruptedException {
Thread.sleep(time);
}
public void run() {
try {
doWork(); // 工作中。。。
System.out.println("工人: " + name + " 完成工作");
} catch (InterruptedException e) {
System.out.println("工人: " + name + " 工作出现意外");
} finally {
latch.countDown(); //工人完成工作,计数器减一
}
}
}
运行结果:
2.Future
但有时发现CountDownLatch只知道子线程的完成情况是不够的,如果在子线程完成后获取其计算的结果,那CountDownLatch就有些捉襟见衬了,所以jdk提供的Future类,不仅可以在子线程完成后收集其结果,还可以设定子线程的超时时间,避免主任务一直等待。
方法摘要 | |
---|---|
|
cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。 |
V |
get() 如有必要,等待计算完成,然后获取其结果。 |
V |
get(long timeout,TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 |
isCancelled() 如果在任务正常完成前将其取消,则返回true。 |
|
isDone() 如果任务已完成,则返回 true。 |
实例:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Test {
private static final ExecutorService executor = Executors.newCachedThreadPool();
private static Random random = new Random();
private static long timeout = 4L;
/**
* 启动多个子任务
*/
public static void startMoreTask() {
List<Callable<Integer>> subTasks = new ArrayList<Callable<Integer>>(); // 子任务集合
List<Integer> subTaskResult = new ArrayList<Integer>(); // 子任务的返回集合
// 1.初始化10个子任务
for (int i = 1; i <= 10; i ++) {
SubTask subTask = new SubTask("子线程-" + i, random.nextInt(10)); // 子线程随机在10秒内完成
subTasks.add(subTask);
}
// 2.执行所有的子任务
try {
List<Future<Integer>> futures = executor.invokeAll(subTasks);
for (Future<Integer> future : futures) {
try {
Integer result = future.get(timeout, TimeUnit.SECONDS); // 设置每个子任务的执行时间不得超过4秒
subTaskResult.add(result);
} catch (ExecutionException | TimeoutException e) {
future.cancel(true); // 当出现执行异常和超时异常时,终止该子任务
}
}
} catch (InterruptedException e) {
System.out.println("任务执行异常:" + e.getMessage());
}
}
public static void main(String[] args) {
// subTask1测试超时的情况
SubTask subTask1 = new SubTask("子线程 - 1", 10);
Future<Integer> future1 = executor.submit(subTask1);
Integer result1 = null;
try {
result1 = future1.get(5, TimeUnit.SECONDS); // 设置子任务的执行时间不得超过5秒
} catch (InterruptedException e) {
System.out.println("线程中断出错");
future1.cancel(true);// 中断执行此任务的线程
} catch (ExecutionException e) {
System.out.println("线程服务出错");
future1.cancel(true);// 中断执行此任务的线程
} catch (TimeoutException e) {// 超时异常
System.out.println("线程执行超时");
future1.cancel(true);// 中断执行此任务的线程
}
System.out.println("subTask1运行结果:" + (result1 == null ? "null" : result1));
// subTask2测试拿到子线程返回结果的情况
SubTask subTask2 = new SubTask("子线程 - 2", 5);
Future<Integer> future2 = executor.submit(subTask2);
Integer result2 = null;
try {
result2 = future2.get(10, TimeUnit.SECONDS); // 设置子任务的执行时间不得超过10秒
} catch (InterruptedException e) {
System.out.println("线程中断出错");
future2.cancel(true);// 中断执行此任务的线程
} catch (ExecutionException e) {
System.out.println("线程服务出错");
future2.cancel(true);// 中断执行此任务的线程
} catch (TimeoutException e) {// 超时异常
System.out.println("线程执行超时");
future2.cancel(true);// 中断执行此任务的线程
}
System.out.println("subTask2运行结果:" + (result2 == null ? "null" : result2));
}
}
class SubTask implements Callable<Integer> {
private String name; // 子线程名
private int second; // 子线程完成需要的时间(秒)
public SubTask (String name, int second) {
this.name = name;
this.second = second;
}
@Override
public Integer call() throws Exception {
System.out.println("#子线程-" + name + " 开始");
Thread.sleep(second * 1000L);
System.out.println("#子线程-" + name + " 结束,耗时秒数: " + second);
return second;
}
}
3.CyclicBarrier -- 循环屏障
之后又发现一个非常好用的多线程辅助类--CyclicBarrier,和CountDownLatch类似,不过适用场景不同。
jdk文档:
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
构造方法摘要 | |
---|---|
CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。 |
|
CyclicBarrier(int parties,Runnable barrierAction) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。 |
方法摘要 | |
---|---|
|
await() 在所有参与者都已经在此 barrier 上调用await 方法之前,将一直等待。 |
|
await(long timeout,TimeUnit unit) 在所有参与者都已经在此屏障上调用await 方法之前将一直等待,或者超出了指定的等待时间。 |
|
getNumberWaiting() 返回当前在屏障处等待的参与者数目。 |
|
getParties() 返回要求启动此 barrier 的参与者数目。 |
|
isBroken() 查询此屏障是否处于损坏状态。 |
|
reset() 将屏障重置为其初始状态。 |
实例:
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class CyclicBarrierTest {
private static ExecutorService executor = Executors.newFixedThreadPool(10, new ThreadFactory() { // 创建固定大小的守护线程池
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
public static void main(String[] args) throws Exception {
// 初始化CyclicBarrier
//CyclicBarrier barrier = new CyclicBarrier(10); // 1.可以只传入一个参数:给定数量的参与者(线程)
CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() { // 2.也可以传入第二个参数,当所有的子线程到达某个公共屏障点,进入barrier线程
@Override
public void run() {
System.out.println("#所有任务初始化完成");
}
});
// 使用线程池
for (int i = 1; i <= barrier.getParties(); i ++) { // getParties() 返回要求启动此 barrier 的参与者数目
String name = "子任务-" + i;
executor.submit(new Task(name, barrier));
}
// 启动所有子线程
executor.shutdown();
Thread.sleep(Integer.MAX_VALUE);
}
}
/**
* 子任务:初始化、执行
*/
class Task implements Runnable {
private String name;
private CyclicBarrier barrier;
public Task (String name, CyclicBarrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 子任务初始化
int sleepTime = new Random().nextInt(10);
Thread.sleep(sleepTime * 1000L);
System.out.println("子任务:" + this.name + " 初始化完成,使用" + sleepTime + "秒,此时已经初始化完成的子任务数量:" + barrier.getNumberWaiting()); // getNumberWaiting() 返回当前在屏障处等待的参与者数目
// 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间
barrier.await(); // 也可以换为await(long timeout, TimeUnit unit) 方法设定超时时间
// 子任务执行
System.out.println("子任务:" + this.name + " 执行完毕");
} catch (Exception e) {}
}
}
运行结果:
这里需要解释一下屏障点,屏障点就是某种状态,上面程序的屏障点就是所有子线程都调用了await()方法。
JDK文档中await()方法的屏障点还有其它情况:
-
在所有参与者都已经在此 barrier 上调用await 方法之前,将一直等待。
如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:
- 最后一个线程到达;或者
- 其他某个线程中断当前线程;或者
- 其他某个线程中断另一个等待线程;或者
- 其他某个线程在等待 barrier 时超时;或者
- 其他某个线程在此 barrier 上调用
reset()
。
如果当前线程:
- 在进入此方法时已经设置了该线程的中断状态;或者
- 在等待时被中断
InterruptedException
,并且清除当前线程的已中断状态。如果在线程处于等待状态时 barrier 被
reset()
,或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出BrokenBarrierException
异常。如果任何线程在等待时被 中断,则其他所有等待线程都将抛出
BrokenBarrierException
异常,并将 barrier 置于损坏状态。如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。
-
- 返回:
-
到达的当前线程的索引,其中,索引
getParties()
- 1 指示将到达的第一个线程,零指示最后一个到达的线程 - 抛出:
-
InterruptedException
- 如果当前线程在等待时被中断 -
BrokenBarrierException
- 如果另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用await
时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。
Semaphore类提供了2个构造器:
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:
public void acquire() throws InterruptedException { } //获取一个许可acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
public void acquire(int permits) throws InterruptedException { } //获取permits个许可
public void release() { } //释放一个许可
public void release(int permits) { } //释放permits个许可
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
实例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
public class SemaphoreTest {
private static ExecutorService executor = Executors.newFixedThreadPool(10, new ThreadFactory() { // 创建固定大小的守护线程池
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
public static void main(String[] args) throws Exception {
// 假定有10个工人,5个机器,1个功能需要1个机器才能工作
int workerNum = 10;
int machineNum = 5;
// 工人占用机器期间其它工人无法使用该机器,直到该机器被释放
Semaphore semaphore = new Semaphore(machineNum);
// 开始工作
for (int i = 1; i <= workerNum; i ++) {
String name = "工人-" + i;
executor.submit(new Worker(name, semaphore));
}
// 启动所有子线程
executor.shutdown();
Thread.sleep(Integer.MAX_VALUE);
}
}
/**
* 工人类,一个工人需要一个机器才能工作
*/
class Worker implements Runnable {
private String name;
private Semaphore semaphore;
public Worker(String name, Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 阻塞,等待一个许可
System.out.println("工人:" + this.name + " 占用机器生产ing");
int sleepTime = new Random().nextInt(10);
Thread.sleep(sleepTime * 1000L);
System.out.println("工人:" + this.name + " 占用机器生产 " + sleepTime + " 秒后释放机器");
semaphore.release();
} catch (Exception e) { }
}
}
运行结果:
总结,CountDownLatch、CyclicBarrier和Semaphore的区别和使用场景
1.CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
(1)CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
(2)而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
(3)CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
2.Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。
参考:
jdk文档
Java并发编程:Callable、Future和FutureTask
Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
Java并发之CountDownLatch、CyclicBarrier和Semaphore