实战Java高并发程序设计之CountDownLatch

时间:2021-05-18 23:49:48

CountDownLatch:一个倒数计时器.

这个工具类通常用来控制线程等待,它可以让某一个线程等待直到计时结束,再开始执行.

CountDownLatch允许一个或多个线程等待直到在其他线程中执行的一组操作

实战Java高并发程序设计之CountDownLatch

如上图所示:主线程会在零界线上等待,检查任务会分别执行,当所有的任务全部都到达零界点以后,主线程才能继续往下执行


基本用法:

CountDownLatch是一个多功能的同步工具,可用于多种用途.

1.如果count初始化为1,它可以作为一个开/关锁存或者是门:所有调用await方法的线程都将一直等到到一个线程调用countDown,将count变成0
2.如果count初始化为n,它可以使一个线程等待,直到N个线程完成某些行为或者某个行为被完成N次

第二种情况分析:

使用Runnable描述每个部分,Runnable执行该部分并在锁存器上倒计时,并将所有Runnables排队到执行程序。
  当所有子部件完成时,协调线程将能够通过等待

伪代码:

class Driver { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}

class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork(int i) {
//what do you want to do...
}
}


源码分析:
CountDownLatch也是使用AQS的状态来控制count

/**
* CountDownLatch的内部类.用 AQS状态来表示计数
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

构造器:

CountDownLatch 初始化的时候必须要给一个count

    /**
* 构造器
*
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

await方法:

 await方法在当前的count到达0之前会一直阻塞,当count为0以后,所有的等待的线程都会被释放,并且随后任何的调用await方法都将立即返回
 这是一次性的现象---count不能被重置.如果你想重置count的话,请考虑CyclicBarrier类

    /**
*
* 如果当前计数为零,则此方法立即返回
*
*如果当前计数大于零,则当前线程将被禁用以进行线程调度,并处于休眠状态,直至发生两件事情之一
*1.count变成0
* 2.当前线程被中断
*
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

/**
* 同上
* 指定时间过后直接返回
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

countDown方法:

为了保持内存一致性,在计数器到达0之前,对应的countDown方法都将在await方法之前返回(happen-before原则)

    /**
* 减少锁存器的计数,如果计数达到零,释放所有等待的线程
* 如果当前计数大于零,则它将递减。 如果新计数为零,则所有等待的线程都将被重新启用以进行线程调度
* 如果当前计数等于零,那么没有任何反应
*/
public void countDown() {
sync.releaseShared(1);
}

getCount方法

    /**
* 返回当前计数.
* 该方法通常用于调试和测试
*/
public long getCount() {
return sync.getCount();
}


书中demo:

public class CountDownLatchDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(10);
static final CountDownLatchDemo demo=new CountDownLatchDemo();
@Override
public void run() {
try {
//模拟任务检查
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
exec.submit(demo);
}
//等待检查
end.await(); //等待所有的线程都完成以后才会取消等待,主线程继续
//发射火箭
System.out.println("Fire!");
exec.shutdown();
}
}