闭锁CountDownLatch是一种同步类,它的作用相当于一扇门,在闭锁达到结束状态之前,这扇门一直是关闭的,没有线程可以通过,当达到结束状态的时候,这扇门就会打开,并且会永远保持打开状态,允许所有的线程通过。闭锁包含一个计数器,计数器就是这扇门,当计数器不为0的时候,调用latch.await方法的线程就会一直阻塞,直到计数器为0。即调用await方法的线程等待计数器为0。
public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch beginLatch = new CountDownLatch(1);//初始开始闭锁为1 CountDownLatch endLatch = new CountDownLatch(4);//初始结束闭锁为4 ExecutorService es = Executors.newCachedThreadPool(); es.execute(new VoteMachine(beginLatch, endLatch)); for (int i = 0; i < 4; i++) { es.execute(new Voter(beginLatch, endLatch)); } } } class VoteMachine implements Runnable { private CountDownLatch beginLatch; private CountDownLatch endLatch; public VoteMachine(CountDownLatch beginLatch, CountDownLatch endLatch) { this.beginLatch = beginLatch; this.endLatch = endLatch; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(1000); System.out.println("开始投票"); //latch计数器减一,开始闭锁为0,可以正常投票 beginLatch.countDown(); //等待,直到计数器为0,所有投票人投完票 endLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("结果出来了"); } } class Voter implements Runnable { private CountDownLatch beginLatch; private CountDownLatch endLatch; private static int no = 0; private int id; public Voter(CountDownLatch beginLatch, CountDownLatch endLatch) { this.beginLatch = beginLatch; this.endLatch = endLatch; this.id = ++no; } @Override public void run() { try { //等待投票器准备完毕,直到计数器为0 beginLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(id + "我投完票了"); //投票,计数器减1 endLatch.countDown(); } }
以上是一个投票的模拟,有两个闭锁。所有投票人都需要在投票器准备完毕之后才能开始投票,由beginLatch控制。投票器要等到所有投票人投完票才能开始统计结果,由endLatch控制。闭锁使用方法很简单,主要方法就是await和countDown。
我们来看一下闭锁的实现
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
构造函数有一个初始的计数作为参数,计数会保存到类Sync。这是闭锁的内部类,继承自抽象类AbstractQueuedSynchronizer,这是Java并发库实现并发的基础(有关这部分,以后详细了解后再介绍)
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }同样地,await方法,调用的也是Sync的方法。acquireSharedInterruptibly的实现在AbstractQueuedSynchronizer里。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
方法调用tryAcquireShared方法,此方法被类Sync覆盖
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }如果计数为0,就不做任何额外操作,线程可以继续执行。不然,会调用doAcquireSharedInterruptibly方法,开始等待,方法的实现在类AbstractQueuedSynchronizer
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); //同前面的方法,计数为0的时候r为1,是不然r为-1 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
调用该方法,线程进入循环,只有当计数器为0的时候,r为1,跳出循环
我们再来看countDown方法
public void countDown() { sync.releaseShared(1); }毫不意外地,操作还是交给sync处理,使计数器减1
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }如果当前计数器为0,那么什么都不做,不然,计数器减1,并将新的计数保存回sync