Java并发编程之CountDownLatch详解

时间:2023-02-09 20:52:24

简介

闭锁是一种同步工具类,可以延迟线程的进度,知道其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能够通过,当达到结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以确保某些活动知道其他活动都完成之后才继续执行。

CountDownLatch是一种灵活的闭锁实现,它允许一个或多个线程等待一组事件的产生。闭锁状态包括一个计数器,该计数器初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法会一直阻塞知道计数器为0,或者等待中的线程中断,或者等待超时。

CountDownLatch源码详解

CountDownLatch类图如下:

Java并发编程之CountDownLatch详解

从类图中可以看出,CountDownLatch内部依赖Sync实现,而Sync继承自AQS。CountDownLatch仅提供了一个构造方法:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync是CountDownLatch的静态内部类,其定义也比较简单,如下所示:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        // 设置同步状态值
        setState(count);
    }

    int getCount() {
        // 获取同步状态值
        return getState();
    }

    // 共享式获取同步状态
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 共享式释放同步状态
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

await()方法

CountDownLatch提供了await()方法来使当前线程一直等待,直到计数器的值减为0,或者线程被中断,该方法定义如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await()方法调用了AQS的共享式相应中断获取同步状态的方法,acquireSharedInterruptibly(int),如下所示:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Sync类重写了tryAcquireShared(int)方法:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

可以看到,只有当计数器(即同步状态)值为0时,才返回1,即当前线程获取到了同步状态,在这里表示等待线程可以继续执行,若计数器值不是0,则当前线程会调用doAcquireSharedInterruptibly(int)方法,一直自旋去尝试获取同步状态:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown()方法

CountDownLatch提供了countDown()方法递减计数器的值,如果计数到达0,则释放所有等待的线程,该方法定义如下:

public void countDown() {
    sync.releaseShared(1);
}

countDown()方法调用了AQS的releaseShared(int)方法来释放共享锁同步状态:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

Sync类重写了releaseShared(int)方法:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 获取同步状态
        int c = getState();
        // 同步状态为0,则直接返回
        if (c == 0)
            return false;
        // 计算并更新同步状态
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

应用程序示例

我们看一个CountDownLatch的应用示例:

public class CountDownLatchTest {
	// 自定义工作线程
	private static class Worker extends Thread {
		private CountDownLatch countDownLatch;
		
		public Worker(CountDownLatch countDownLatch) {
			this.countDownLatch = countDownLatch;
		}
		
		@Override
		public void run() {
			super.run();
			
			try {
				countDownLatch.await();
				System.out.println(Thread.currentThread().getName() + "开始执行");
				// 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + "执行完毕");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void main(String[] args) throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(1);
		
		for (int i = 0; i < 3; i++) {
			System.out.println("创建工作线程" + i);
			Worker worker = new Worker(countDownLatch);
			worker.start();
		}
		
		// 工作线程需要等待主线程准备操作完毕才可以执行,这里用Thread.sleep()来模拟准备操作
		Thread.sleep(1000);
		System.out.println("主线程准备完毕");
		
		countDownLatch.countDown();
	}
}

运行结果(不唯一):

创建工作线程0
创建工作线程1
创建工作线程2
主线程准备完毕
Thread-0开始执行
Thread-2开始执行
Thread-1开始执行
Thread-0执行完毕
Thread-2执行完毕
Thread-1执行完毕

在上述代码中,我们自定义的工作线程必须要等主线程准备完毕才可以执行,我们可以使用CountDownLatch类来帮助我们完成。从程序的执行结果中也可以看出,3个工作线程确实是在主线程准备完毕后才开始执行。

相关博客

AbstractQueuedSynchronizer同步队列详解

AbstractQueuedSynchronizer共享式同步状态获取与释放

参考资料

方腾飞:《Java并发编程的艺术》

Doug Lea:《Java并发编程实战》