ReentrantLock的最后讲一下Condition的使用,之前在Lock接口中,我们看到他有一个newCondition要求子类实现,从而能够实现在某一个条件上等待。(相较于传统的由虚拟机支持的synchronized关键字和wait/notify的组合,它能够选择性的唤醒正确的线程)。
同样根据方法的调用顺序,先来查看ReentrantLock中newCondition的实现是如何返回一个Condition对象的:
public Condition newCondition() {
return sync.newCondition();
}
老样子委托给了内部对象sync,sync返回了一个新的ConditionObeject对象。当一个线程调用了该对象上的await方法时:
public final void await() throws InterruptedException {
if (Thread.interrupted())//如果线程被中断,抛出异常
throw new InterruptedException();
Node node = addConditionWaiter();//否则将当前线程加入Condition的等待队列
int savedState = fullyRelease(node);//完全释放锁,并记录重入次数
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//阻塞,直到进入sync队列
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//重新排队竞争锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
上面的注释仅仅简单描述了await的主要逻辑,接下来我们将进入各个主要方法进一步了解他的实现方式。
将当前线程加入Condition的等待队列:
//condition队列也维护了一个头指针和尾指针,插入到队列就是将尾指针指向自己
//
private Node addConditionWaiter() {
Node t = lastWaiter;//找到最后一个节点
// 这个节点已经取消等待,清除那些无用节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();//清除节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);//构造新节点
//插入
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
可以看到代码的逻辑很简单,第一步就是找最后的节点,然后根据情况清除无用节点(已取消的),然后插入。清除无用节点就是一个链表删出节点的过程,不用再细说了。
在调用addConditionWaiter方法后,当前线程的节点就进入了Condition队列,接下来毫无疑问就应该释放锁并阻塞线程直到被唤醒了。释放锁的时候,必须要记录重入次数,以便在被唤醒后争取正确的重入次数。
此处注意到Condition队列是一个单向链表队列,只能向后遍历!
//完全释放锁并返回重入次数
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();//记录成功
if (release(savedState)) {//释放当前次数
failed = false;
return savedState;
} else {
//如果没有持有锁而试图在条件上等待,将会抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
release方法在之前的解锁文章中已经讲过了。如果当前线程没有持有锁而试图在条件上等待,将会抛出IlegalMonitorStateException(如:new Object().wait()也会抛出此异常,或者synchronize一个对象,却在另一个对象上wait,同样抛出此异常),和内置锁的工作原理很相似。
接下来是一个循环,条件是!isOnSyncQueue(Node node),Sync队列是等待获取锁的队列(也就是前几篇文章中线程阻塞的队列),如果线程不在Sync队列,那么毫无疑问线程等待的条件还没有达到,要继续阻塞在Condition队列.
final boolean isOnSyncQueue(Node node) {
//因为只有Condition单向队列,而sync队列一定有前节点,因此只要没有prev节点就在Condition队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//Condition队列中用nextWaiter,Sync队列中用next和prev
//所以next不为null肯定在sync队列,prev不为null的情况在下面讨论
if (node.next != null) // If has successor, it must be on queue
return true;
/*
源代码的注释很清楚了,prev不为null却不在sync队列的原因是:使用cas操作入队可能失败,但一定会成功的是把自己的prev指向sync尾巴,但只有成功者才能把sync尾巴的next指向自己
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
//上述是通过快速判断,在大部分情况下很快能返回从而提高效率,但如果
//未满足上述条件,只能老实的遍历一遍了。
return findNodeFromTail(node);
}
findNodeFromTail方法就是简单遍历链表,也就不说了。 继续回到await方法,当前线程会阻塞在Condition队列直至进入Sync队列。在这个while循环中,会判断是否中断,这也是离开while循环的方法:
//这两个鬼畜的三元运算符看得头疼
//如果线程未中断,返回0;否则说明线程被中断了,应当取消在此条件上的等待。此时线程可以尝试重新进入sync。如果使用cas操作入队成功了,意味着为被唤醒前就被中断了;如果cas操作失败了,说明已经在被中断前被signal方法唤醒了,且正在帮助当前节点进入sync队列,也就是被条件唤醒后又被中断了,这时候只需要安静的等待入队完成就好了(这个意味着此中断不针对自己的条件等待)。
//throw ie 代表的是被唤醒前中断了,因此需要代表着应该抛出一个异常作为响应。
//reinterrupt代表的是已经被唤醒被时中断了,因此需要将这个中断状态传递给上层代码
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
如果上述cas操作失败了,代表着在已经被从条件等待中唤醒了才中断
!!:此处是放弃条件等待而非放弃获取锁,反而是立即去获取锁,下午脑袋蒙了卡在下面这个逻辑上好久。。
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
控制权继续回到了await方法,根据checkInterruptwhileWaiting方法的逻辑我们知道,如果返回值不为0代表着进入sync队列等待获取锁,因此需要跳出此循环,返回值为0继续等待()。acquireQueued方法在讲解lock工作原理时已经西过了,这个方法将会让sync队列中的线程阻塞,被唤醒,竞争锁,如此循环直至成功获取锁,并返回这个过程是否被中断过(acquireQueued方法也屏蔽了中断,但通过返回一个值告诉调用者这个过程是否中断过)。await方法也使用了此interrupmode来记录方法调用过程的中断状态。reportIntrrupt方法听过接受的参数,判断自己是应该抛出一个异常还是中断自己(也就是把自己的中断状态置为true,由高一层的代码来处理).
由此可以总结一下await方法的基本工作流程:
1.判断线程是否中断,是则立即抛出中断异常
2.否则,将当前节点添加到当前条件的等待队列
3.完全释放锁,记录重入次数
4.循环,条件为是否离开了condition队列并进入了sync队列
4.1立即阻塞
4.2判断离开阻塞的原因,被中断/被唤醒
4.3如果被中断,检验中断发生时间:唤醒前还是唤醒后
4.4如果中断发生或者进入了sync队列,跳出循环
5.进入sync队列阻塞至获取锁,并记录在此状态中是否中断
5.1如果中断过,判断interruptmode,如果不是throw_ie,将其置为reinterrupt,以便之后进行中断,将中断状态交由上层处理
6.只要中断状态为throw_ie,抛出异常代表在被唤醒前中断过,由上层代码处理此异常(因此需要中断处理链的finally unlock).