1 CountDownLatch同步计数器
1.1 类的概述
倒计数门闩。在完成一组正在其它线程中执行的操作之前,它允许一个或多个线程一直等待。在计数器到达0之前,await方法会一直阻塞;之后,会释放所有等待线程,await的所有后续调用都将返回。
1.2 主要方法
CountDownLatch(int count)
构造一个用给定计数初始化的同步计数器void await()
使当前线程在计数器倒计数至0前一直等待,除非被中断。boolean await(long timeout, TimeUnit unit)
使当前线程在计数器倒计时至0前一直等待,除非被中断或超时。void countDown()
计数器值减一;如果到达0,则释放所有等待的线程- long getCount()
返回当前计数
1.3 使用场景
(1)开5个线程去下载,当5个线程都执行完才算下载成功
(2)多个线程上传文件,只有当每个文件都上传成功才算上传成功。
1.4 应用示例
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Worker worker1 = new Worker("worker1", latch);
Worker worker2 = new Worker("worker2", latch);
Worker worker3 = new Worker("worker3", latch);
worker1.start();
worker2.start();
worker3.start();
latch.await(); // 等待计数器到0
System.out.println("Main thread end");
}
// 内部类,线程类的实现
static class Worker extends Thread {
private String workName;
private CountDownLatch latch;
public Worker(String name, CountDownLatch latch) {
this.workName = name;
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println("worker:"+this.workName+" is begin");
Thread.sleep(1000L);
System.out.println("worker:"+this.workName+" is end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
}
2 CyclicBarrier同步计数器
2.1 类的概述
循环屏障允许一组线程互相等待,直到到达某个公共屏障点,然后所有的这组线程再同步往后执行。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的barrier。
2.2 主要方法
CyclicBarrier(int parties)
创建一个新的循环屏障,它将在给定数量的线程处于等待状态时启动,但不会在启动barrier时执行预定义的操作。CyClicBarrier(int parties, Runnable barrierAction)
创建一个新的循环屏障,它将在给定数量的线程处于等待状态时启动,并在启动barrier时执行给定的屏障操作barrierAction,该操作由最后一个进入barrier的线程执行。int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。int getNumberWaiting()
返回当前在屏障处等待的线程数目int getParties()
返回要求启动此barrier的线程数目void reset()
将循环屏障重置为初始状态。
2.3 使用示例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, new TotalTask());
Worker worker1 = new Worker("worker1", barrier);
Worker worker2 = new Worker("worker2", barrier);
Worker worker3 = new Worker("worker3", barrier);
worker1.start();
worker2.start();
worker3.start();
System.out.println("main thread end");
}
// 启动barrier时执行该任务,即当最后一个线程进入barrier时执行该任务
static class TotalTask extends Thread {
@Override
public void run() {
System.out.println("所有线程到达barrier");
}
}
// 任务线程
static class Worker extends Thread {
private String name;
private CyclicBarrier barrier;
public Worker(String name, CyclicBarrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println(name+"任务开始");
Thread.sleep(1000L);
System.out.println(name+"任务完成");
barrier.await(); // 线程到达屏障
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
2.4 CountDownLatch与CyclicBarrier的区别
CountDownLatch:一个线程等待另外N个线程完成某个事情之后才能执行,重点是一个线程在等待
CyclicBarrier:N个线程互相等待,任何一个线程完成之前,所有线程都必须等待。
3 Semaphore同步计数器
3.1 类的概述
是一个计数信号量,维护一个许可集合。在许可可用前会阻塞每一个acquire(),等待获取许可;release()释放当前占用的许可,允许其它阻塞的线程获得。
3.2 方法概述
Semaphore(int permits)
创建具有给定许可数目、非公平的Semphore对象Semaphore(int permits, boolean fair)
创建具有给定许可数目、公平的Semaphore对象。所谓公平性就是先来先服务FIFOvoid acquire()
从此信号量获取一个许可,在获取到许可之前线程将被阻塞int availablePermits()
返回此信号量中的可用许可数目- void release()
释放当前许可
3.3 使用示例
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);// 许可数目为3
// 创建并启动12个线程
for (int i = 0; i < 12; i++) {
Worker worker = new Worker("worker"+i, semaphore);
worker.start();
}
System.out.println("main thread end");
}
// 任务线程,共用同一个信号量
static class Worker extends Thread {
private String name;
private Semaphore semaphore;
public Worker(String name, Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
Thread.sleep(1000L);
semaphore.acquire();// 等待获取一个许可
System.out.println(name+"获取到一个许可,开始处理任务");
Thread.sleep(1000L);
semaphore.release();// 释放一个许可
System.out.println(name+"释放许可,任务结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}