闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须现在这个闭锁上等待。
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
- 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。*
CountDownLatch
CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时。·
如下代码展示了CountDownLatch的使用。
import java.util.concurrent.CountDownLatch;
/**
* @author bridge
*/
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread() {
public void run() {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
Thread.sleep(3000);
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread() {
public void run() {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
Thread.sleep(3000);
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
try {
System.out.println("等待2个子线程执行完毕...");
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果如下:
子线程Thread-0正在执行
等待2个子线程执行完毕...
子线程Thread-1正在执行
子线程Thread-0执行完毕
子线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程
Semaphore
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定,在执行操作时可以首先获得许可,并在使用以后释放许可。如果没有一个许可给信号量,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始化为1的Semaphore。二值信号量可以用作互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
应用场景:
1.Semaphore可以用于实现资源池,例如数据库连接池。可以构造一个固定长度的资源池。将Semaphore的计数值初始化为池的大小,并从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。
2.使用Semaphore将任何一种容器变成有界阻塞容器。信号量的计数值会初始化为容器的最大值。add操作在向底层容器添加一个元素之前,首先要获取一个许可。如果add操作没有添加任何元素,那么会立刻释放许可。同样,remove操作释放一个许可,使更多的元素能够添加到容器中。
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
/**
* 信号量实现有界阻塞容器
*
* @author bridge
*/
class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
//获取许可
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
//如果添加失败,释放许可
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
//如果移除成功则释放许可
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
public class BoundedHashSetTest {
public static void main(String[] args) {
final BoundedHashSet<String> set = new BoundedHashSet<String>(5);
new Thread() {
@Override
public void run() {
System.out.println("添加");
try {
set.add("A");
set.add("B");
set.add("C");
set.add("D");
set.add("E");
set.add("F");
System.out.println("添加完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread() {
@Override
public void run() {
System.out.println("移除");
set.remove("A");
// set.remove("B");
// set.remove("F");
System.out.println("移除完毕");
}
}.start();
}
}
运行结果
添加
移除
移除完毕
添加完毕
CyclicBarrier
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:”所有人6:00在麦当劳碰头,到了以后要等待其他人,之后再讨论下一步要做的事情。”
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierExcepiton。
如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引号来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
下面的例子展示了CyclicBarrier的使用。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author bridge
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier cyclicBarrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++) {
new Player(cyclicBarrier).start();
}
}
}
class Player extends Thread {
private CyclicBarrier cyclicBarrier;
public Player(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep((long) (5000 * Math.random()));
System.out.println(Thread.currentThread().getName() + "准备好了!");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 起跑!");
}
}
执行结果
Thread-1准备好了!
Thread-3准备好了!
Thread-2准备好了!
Thread-0准备好了!
Thread-0 起跑!
Thread-1 起跑!
Thread-2 起跑!
Thread-3 起跑!