深入浅出AQS之条件队列

时间:2021-01-28 19:38:12

相比于独占锁跟共享锁,AbstractQueuedSynchronizer中的条件队列可能被关注的并不是很多,但它在阻塞队列的实现里起着至关重要的作用,同时如果想全面了解AQS,条件队列也是必须要学习的。

原文地址:http://www.jianshu.com/p/3f8b08ca21cd

这篇文章会涉及到AQS中独占锁跟共享锁的一些知识,如果你已经对这两块内容很了解了,那就直接往下看。否则在读本文之前还是建议读者先去看看我之前写的两篇文章温习一下。

深入浅出AQS之独占锁模式

深入浅出AQS之共享锁模式

一、使用场景介绍

区别于前面两篇文章,可能之前很多人都没有太在意AQS中的这块内容,所以这篇文章我们先来看下条件队列的使用场景:

//首先创建一个可重入锁,它本质是独占锁
private final ReentrantLock takeLock = new ReentrantLock();
//创建该锁上的条件队列
private final Condition notEmpty = takeLock.newCondition();
//使用过程
public E take() throws InterruptedException {
//首先进行加锁
takeLock.lockInterruptibly();
try {
//如果队列是空的,则进行等待
notEmpty.await();
//取元素的操作... //如果有剩余,则唤醒等待元素的线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
//取完元素以后唤醒等待放入元素的线程
}

上面的代码片段截取自LinkedBlockingQueue,是Java常用的阻塞队列之一。

从上面的代码可以看出,条件队列是建立在锁基础上的,而且必须是独占锁(原因后面会通过源码分析)。

二、执行过程概述

等待条件的过程:

  1. 在操作条件队列之前首先需要成功获取独占锁,不然直接在获取独占锁的时候已经被挂起了。
  2. 成功获取独占锁以后,如果当前条件还不满足,则在当前锁的条件队列上挂起,与此同时释放掉当前获取的锁资源。这里可以考虑一下如果不释放锁资源会发生什么?
  3. 如果被唤醒,则检查是否可以获取独占锁,否则继续挂起。

条件满足后的唤醒过程(以唤醒一个节点为例,也可以唤醒多个):

  1. 把当前等待队列中的第一个有效节点(如果被取消就无效了)加入同步队列等待被前置节点唤醒,如果此时前置节点被取消,则直接唤醒该节点让它重新在同步队列里适当的尝试获取锁或者挂起。

注:说到这里必须要解释一个知识点,整个AQS分为两个队列,一个同步队列,一个条件队列。只有同步队列中的节点才能获取锁。前面两篇独占锁共享锁文章中提到的加入队列就是同步队列。条件队列中所谓的唤醒是把节点从条件队列移到同步队列,让节点有机会去获取锁。

二、源码深入分析

下面的代码稍微复杂一点,因为它考虑了中断的处理情况。我由于想跟文章开头的代码片段保持一致,所以选取了该方法进行说明。如果只想看核心逻辑的话,那推荐读者看看awaitUninterruptibly()方法的源码。

        //条件队列入口,参考上面的代码片段
public final void await() throws InterruptedException {
//如果当前线程被中断则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//把当前节点加入条件队列
Node node = addConditionWaiter();
//释放掉已经获取的独占锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在同步队列中则不断挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//中断处理,另一种跳出循环的方式
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
//这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
//删除条件队列中被取消的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//根据不同模式处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

流程比较复杂,一步一步来分析,首先看下加入条件队列的代码:

        //注:1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
//注:2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果最后一个节点被取消,则删除队列中被取消的节点
//至于为啥是最后一个节点后面会分析
if (t != null && t.waitStatus != Node.CONDITION) {
//删除所有被取消的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
} //删除取消节点的逻辑虽然长,但比较简单,就不单独说了,就是链表删除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

把节点加入到条件队列中以后,接下来要做的就是释放锁资源:

    //入参就是新创建的节点,即当前节点
final int fullyRelease(Node node) {
boolean failed = true;
try {
//这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
//可以考虑下这个逻辑放在共享锁下面会发生什么?
int savedState = getState();
//跟独占锁释放锁资源一样,不赘述
if (release(savedState)) {
failed = false;
return savedState;
} else {
//如果这里释放失败,则抛出异常
throw new IllegalMonitorStateException();
}
} finally {
//如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中只需要判断最后一个节点是否被取消就可以了
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

走到这一步,节点也加入条件队列中了,锁资源也释放了,接下来就该挂起了(先忽略中断处理,单看挂起逻辑):

     //如果不在同步队列就继续挂起(signal操作会把节点加入同步队列)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//中断处理后面再分析
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//判断节点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
//快速判断1:节点状态或者节点没有前置节点
//注:同步队列是有头节点的,而条件队列没有
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
if (node.next != null)
return true;
//上面如果无法判断则进入复杂判断
return findNodeFromTail(node);
} //注意这里用的是tail,这是因为条件队列中的节点是被加入到同步队列尾部,这样查找更快
//从同步队列尾节点开始向前查找当前节点,如果找到则说明在,否则不在
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

如果被唤醒且已经被转移到了同步队列,则会执行与独占锁一样的方法acquireQueued()进行同步队列独占获取。

最后我们来梳理一下里面的中断逻辑以及收尾工作的代码:

     while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//这里被唤醒可能是正常的signal操作也可能是中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
} //这里的判断逻辑是:
//1.如果现在不是中断的,即正常被signal唤醒则返回0
//2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
} //修改节点状态并加入同步队列
//该方法返回true表示节点由中断加入同步队列,返回false表示由signal加入同步队列
final boolean transferAfterCancelledWait(Node node) {
//这里设置节点状态为0,如果成功则加入同步队列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//与独占锁同样的加入队列逻辑,不赘述
enq(node);
return true;
}
//如果上面设置失败,说明节点已经被signal唤醒,由于signal操作会将节点加入同步队列,我们只需自旋等待即可
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

在把唤醒后的中断判断做好以后,看await()中最后一段逻辑:

//在处理中断之前首先要做的是从同步队列中成功获取锁资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//由于当前节点可能是由于中断修改了节点状态,所以如果有后继节点则执行删除已取消节点的操作
//如果没有后继节点,根据上面的分析在后继节点加入的时候会进行删除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); //根据中断时机选择抛出异常或者设置线程中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//实现代码为:Thread.currentThread().interrupt();
selfInterrupt();
}

至此条件队列await操作全部分析完毕。signal()方法相对容易一些,一起看源码分析下:

   //条件队列唤醒入口
public final void signal() {
//如果不是独占锁则抛出异常,再次说明条件队列只适用于独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//如果条件队列不为空,则进行唤醒操作
Node first = firstWaiter;
if (first != null)
doSignal(first);
} //该方法就是把一个有效节点从条件队列中删除并加入同步队列
//如果失败则会查找条件队列上等待的下一个节点直到队列为空
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
} //将节点加入同步队列
final boolean transferForSignal(Node node) {
//修改节点状态,这里如果修改失败只有一种可能就是该节点被取消,具体看上面await过程分析
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//该方法很熟悉了,跟独占锁入队方法一样,不赘述
Node p = enq(node);
//注:这里的p节点是当前节点的前置节点
int ws = p.waitStatus;
//如果前置节点被取消或者修改状态失败则直接唤醒当前节点
//此时当前节点已经处于同步队列中,唤醒会进行锁获取或者正确的挂起操作
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

三、总结

相比于独占锁跟共享锁,条件队列可能是最不受关注的了,但由于它是阻塞队列实现的关键组件,还是有必要了解一下其中的原理。其实我认为关键点有两条,第一是条件队列是建立在某个具体的锁上面的,第二是条件队列跟同步队列是两个队列,前者依赖条件唤醒后者依赖锁释放唤醒,了解了这两点以后搞清楚条件队列就不是什么难事了。


至此,Java同步器AQS中三大锁模式就都分析完了。虽然已经尽力思考,尽量写的清楚,但鉴于水平有限,如果有纰漏的地方,欢迎广大读者指正。

明天就是国庆长假了,我自己也计划出国玩一趟,散散心。

提前祝广大朋友国庆快乐。