Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch

时间:2022-09-21 10:05:19

在前面分析ReentrantLock/ReentrantReadWriteLock的时候,我们已经对AQS进行过分析。在初步了解了AQS之后,本篇试图对其进行一个更为系统性的分析。因为AQS是为整个同步框架的基石,不光是锁,很多其他同步组件,比如Semaphore, CountDownLatch,也都是建立在AQS之上。
-AQS–同步框架的基石
-AQS的3个核心技术原理
AQS源码解析
AQS独占模式与共享模式
Semaphore与CountDownLatch


AQS-同步框架的基石

下图展示了整个JUC包中,继承自AQS的类。可以看到,ReentrantLock/ReentrantReadWriteLock, Semaphone, CountDownLatch, FutureTask等组件,都是建立在AQS之上。
Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch

AQS的3个核心技术原理

(1) 一个int型的原子变量state。所有线程对这个变量进行cas访问,来判断自己是应该阻塞,还是进入临界区。
(2)一对park/unpark原语,实现1个线程对另1个线程的精确控制:阻塞,唤醒
(3)用无所链表实现的,所有阻塞的线程,形成的一个阻塞队列

基本思路如下:
acquire的时候,判断state,state条件满足,进入临界区执行代码,不满足,把自己加入阻塞队列,然后阻塞自己;
release的时候,更新state,同时唤醒队列中的继任者,继任者唤醒之后,再次acquire拿锁,走acquire流程。

下面对AQS源码里面的关键函数进行分析;

AQS源码解析

AQS的接口层

AQS的对外提供的接口主要有以下几个:
//独占模式
acquire
acquireInterruptibly
release

//共享模型
acquireShared
acquireSharedInterruptibly
releaseShared

    public final void acquire(int arg) {
if (!tryAcquire(arg) && //模板方法
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) //模板方法
doAcquireInterruptibly(arg);
}

public final boolean release(int arg) {
if (tryRelease(arg)) { //模板方法
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}


public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) //模板方法
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //模板方法
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //模板方法
doReleaseShared();
return true;
}
return false;
}

在上面的接口中,留了4个模板方法供子类来实现,也就是类图中的各种Sync:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared

AQS的几个核心函数

从上面的代码中,可以看到,AQS内部的实现细节,主要一下几个函数:

独占模式下的3个关键函数:
private void acquireQueued() //屏蔽中断,进入队列
private void doAcquireInterruptibly(int arg) //响应中断
private void unparkSuccessor(Node node) //减,唤醒继任者

共享模式下的2个关键函数:
private void doAcquireSharedInterruptibly(int arg) //加
private void doReleaseShared() { //减

下面一一分析:

独占模式核心代码分析

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)); //addWaiter,入队列

final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //去拿锁
setHead(node); //出对列
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //阻塞自己
interrupted = true; //从阻塞中被唤醒,回到上面。继续死循环
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); //入队列
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //去拿锁
setHead(node); //出对列
p.next = null;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //阻塞自己
break; //被中断唤醒,直接退出
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node); //被中断唤醒,没拿到锁退出了,取消accquire
throw new InterruptedException();
}

private void unparkSuccessor(Node node) { //唤醒后继者
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

这里说2个关键点:
(1)acquireQueued屏蔽中断,doAcquireInterruptibly响应中断。唯一区别就是,前者被中断唤醒之后,继续死循环拿锁,直到拿到为止;
后者被中断唤醒之后,直接break跳出for死循环了。
(2)不管入队列,还是出队列,都是在acquire的时候,发生的。 release的时候,只负责唤醒继任者。所以release的逻辑相当简单,关键acquire的比较复杂,整体思路如下:

step1: tryAcquire,成功,就直接执行后面代码了。失败, 没拿到锁
step2: 调用addWaiter把自己加入阻塞队列(注意,此时并没有阻塞!)
step3: 开始for循环, 尝试再去tryAcquire一次,如果拿到,出队列;
step4: 没有拿到,自己阻塞自己
step5: 被中断或者unparkSuccessor唤醒,唤醒之后;
如果不响应中断,回到步骤3,再尝试拿锁;拿不到,再阻塞;阻塞唤醒,再拿锁;。。。如此死循环,直到拿到锁,函数返回!!
如果响应中断,没拿到锁就返回,返回之前,取消acquire,即cancelAcquire;

以上就是独占模式的代码分析。在继续解析共享模式代码之前,先对“独占模式“与“共享模式“有个清晰的概念:

#独占模式与共享模式
独占模式:acquire的时候,只能有1个线程拿到资源;release的时候,1次也只能唤醒1个线程。ReentrantLock, ReentrantReadWriteLock.WriteLock,都用的是AQS的独占模式;

共享模式:acquire的时候,可以有多个线程同时拿到资源;release的时候,队列中的所有线程都被唤醒,也就是propagate,然后同时再去拿锁。
ReentrantReadWriteLock.ReadLock, Semaphore, CountDownLatch都是用的共享模式。
比如对于Semaphore来说:acquire的时候,只要state > 0,就可拿到锁;
release的时候,不一定只释放1个,可以release(n),也就意味着同时释放n个资源,阻塞队列中所有线程被唤醒,然后有n个线程可以同时拿到锁;
在比如对于CountDownLatch来说,release到countDown = 0 的时候,所有等待countDown = 0 的线程都被唤醒,都同时拿到锁。

共享模式代码分析

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //入队列
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); //去拿锁
if (r >= 0) {
setHeadAndPropagate(node, r); //拿到锁,出队,同时唤醒后继者
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //阻塞自己
break; //被中断唤醒
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node); //被中断唤醒,没拿到锁,退出
throw new InterruptedException();
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); //关键点
}
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //唤醒继任者
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

这里有个关键点:doReleaseShared会逐个唤醒等待队列中的所有线程,然后这所有线程又再次去争抢锁。对于CountDownLatch来说,所有线程都会争抢到countDown = 0,所以所有线程拿到锁;对于Semaphore.release(1)来说,只释放了1个资源,但唤醒了所有线程,然后只有1个线程可以拿到资源,剩下的n-1个线程,再次进入阻塞状态。

#Semaphore 与 CountDownLatch
搞清楚了AQS的共享模式原理,Semaphore和CountDownLatch就很简单了。2者只是对state变量的判断策略不一样而已,可以说刚好相反:
Semaphore: state > 0,进入;state = 0 阻塞。
CountDownLatch: state > 0, 阻塞;state = 0,进入;

下面比较一下2者的tryAcquireShared就知道了:

//CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; //state = 0, 返回true
}


//Semaphore.NonFairSync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || //跟CountDownLatch刚好相反,state >= 0 返回true
compareAndSetState(available, remaining))
return remaining;
}
}

//Semaphore.FairSync
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || //跟CountDownLatch刚好相反,state > = 0 返回ture
compareAndSetState(available, remaining))
return remaining;
}
}

同时,tryRelaseShared也是刚好相反,一个是++,一个是–

//CountDownLatch.Sync
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1; //资源--
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

//Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases; //资源++
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

总结:
共享模式与独占模式,最大的区别在于:共享模式下,一次release会唤醒队列中的所有线程,即使这次release只释放了一个资源,只有一个线程可以拿到资源。所有线程唤醒之后,再次去争夺资源,可能所有线程都能争夺到(比如CountDownLatch),也可能只有一个线程争夺到(比如Semaphore.release(1))。