深入并发包之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码
这篇文章是一篇并发相关的源码分析文章,主要从源码级别分析AQS操作(主要关于阻塞锁的实现),从而加深对并发中ReentrantLock
和ReentrantReadWriteLock
两种锁的理解。
AQS概述
下面引用JDK文档中对AQS类的描述:
为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。
在这篇文章中我们只关注有关锁的实现的相关部分,那么这段话就可以这样理解。
AQS类的内部实现了一个FIFO的等待队列(实际是一个链表),用来保存线程等相关内容,同时,利用这个队列来实现一个阻塞锁。
同时AQS中有一个int类型的值state,这个值有相关的原子操作,我们可以使用相关的方法来修改这个值。
当state的值等于0时,我们认为没有线程获得了锁;当state的值大于0时,我们认为已经有线程获得了锁,同时,线程每重入一次,state的值加一,这样就可以实现锁的重入。
AQS中我们有独占和共享两种模式,因此我们既可以实现独占锁,也可以实现共享锁。
队列的实现
AQS的队列的实现主要有一个内部类Node
,来表示队列的结点。
static final class Node {
//结点的状态,signal等
volatile int waitStatus;
//结点的前驱结点
volatile Node prev;
//结点的后继结点
volatile Node next;
//结点保存的线程
volatile Thread thread;
//表示当前结点的模式,独占还是共享
Node nextWaiter;
}
由此我们可以看出,所谓的FIFO队列实际上是一个双向链表,结点中的主要数据就是状态和线程。我们会将所有获取锁但是没有成功的线程保存到队列中,这样我们就可以在此基础上实现其他功能,例如公平锁和非公平锁等内容。
下面给出结点状态的可能取值:
//由于超时或者中断,线程处于取消状态
static final int CANCELLED = 1;
//线程处于阻塞状态
static final int SIGNAL = -1;
//线程再等待满足条件
static final int CONDITION = -2;
//线程状态需要向后传播
static final int PROPAGATE = -3;
结点的默认状态为0,这种状态表示结点的状态不属于上面的任意一种。当结点状态大于0时,意味着线程不需要被阻塞,因此,很多时候JDK在校验的时候没有校验到具体值,而是一种范围校验。
既然实现了队列,那么自然需要维护队列中的结点,后面我们会介绍相关方法。
ReentrantLock
加锁方法的概述
首先我们先来描述一下ReentrantLock
使用AQS实现加锁需要实现的功能。
1、 将所有需要加锁的线程维护到队列中
2、 将所有需要等待的线程阻塞
3、 将所有被取消的线程从队列中删除
4、 将所有被中断的线程中断
5、 将获得锁的线程从队列中删除
AQS的使用
使用AQS来实现一个锁,我们需要在类的内部声明一个非公共的类来继承AQS类,并且实现AQS类中的部分方法。
一般来说,我们需要自己重写方法如下
//独占获取锁
tryAcquire(int)
//独占模式释放锁
tryRelease(int)
//共享模式获取锁
tryAcquireShared(int)
//共享模式释放锁
tryReleaseShared(int)
在AQS中,上面几个方法体是直接抛出异常,所以如果我们没有自己重写,那么加锁方法是无法调用的。
查看ReentrantLock
类的源码,其内部有一个类Sync
类。,继承了AQS类,并且重写了部分方法。由于ReentrantLock
和ReentrantLock
实现了公平锁和非公平锁,所以,其类的内部又有两个类NonfairSync
和FairSync
类继承了Sync
类。
加锁代码思路
上面的介绍应该让大家已经对AQS的使用和基本功能有所了解了,接下来,我们将跟随ReentrantLock
源码的思路,一步步分析加锁和解锁的流程。
博主会把JDK中的源码主要内容粘贴到文章中,好了,开始吧。
下面这段是ReentrantLock
中的加锁部分
public void lock() {
sync.lock();
}
可以看出加锁部分实际是调用了Sync
类的lock()
方法。
当我们查看lock()
方法时发现,这个方法是一个抽象方法,实际上,在公平锁和非公平锁中,这个方法的实现是不同,因此,这个方法是由NonfairSync
和FairSync
类实现的,下面以非公平锁中的实现为例。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
加锁过程概览见下图
加锁过程中,首先通过一个CAS操作,如果state的值为0(即没有线程获得了独占锁),那么将其设置为1(表示已经有一个线程获得了独占锁),如果设置成功,那么加锁就成功了,然后调用方法setExclusiveOwnerThread
,将获得锁的线程设置为当前线程。
由于是非公平锁,所以不需要考虑线程的等待时间等因素。
同时需要注意,如果CAS操作失败,也就是说已经有线程已经获得了独占锁,这时我们调用acquire
函数获取锁,这里应该是考虑到锁重入等情况。同时如果是第一个线程进入,就可以直接获得锁,效率较高。
下面给出acquire
方法,这个方法实际上是由AQS给出的实现。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里可以看出acquire
方法会调用我们自己实现的tryAcquire
方法来尝试获得锁,如果获取锁成功,那么方法就直接结束了,如果获取锁失败会继续执行后面的方法。
tryAcquire
方法
那么,接下来查看非公平锁中的tryAcquire
方法。
注意,tryAcquire
方法是由ReentrantLock
类实现的
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
查看源码,nonfairTryAcquire
方法是在Sync
类中给出的。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//没有线程获得过锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
* 已经有线程获取了锁,由于是独占锁,所以其他线程不可以
*再次进入,但是有一种特殊情况,就是当前线程或获得锁的
*线程是同一个,这时满足可重入
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这段代码的整体流程如下图:
这段代码的思路如下:
- 如果state的值是0,那么证明独占锁没有被占有,我们可尝试CAS操作,如果CAS操作成功,那么当前线程获得了锁,返回为真。
- 如果state的值不为0,那么说明已经有线程获得了锁,其他线程就不应该可以获得锁,但是,这里有一种情况,就是重入,如果已经占有锁的线程和当前线程是同一线程,那么就可以获得锁,并且需要将state的值加一,表示重入了一次,这里还对state的值进行了校验,以防溢出,这里每个线程的重入次数实际上是有限的。
addWaiter
以及acquireQueued
方法
这里让我们的目光会到acquire
方法中,假如获取锁失败,那么接下来我们需要做的就是维护FIFO链表,并且让所有没有获得锁的线程进入等待队列,直到获得锁。
为了方便,这里将acquire
方法再次给出
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面介绍方法addWaiter
和acquireQueued
,这里Node.EXCLUSIVE
实际上就是null
,这里是用来区分节点是等待独占锁还是共享锁的。
给出addWaiter
的代码,这部分代码有AQS实现
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//新建节点
Node pred = tail;
//如果队列有值那么可以直接将当前节点加入到队列尾部,注意这里如果多个结点同时进入是有可能发生CAS操作失败的情况,所以接下来就需要enq方法
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//调用enq初始化队列并将结点加入队列中
enq(node);
return node;
}
这部分代码的主要职责是维护链表的顺序,生成一个节点,将节点维护到链表的尾部,如果链表为空或者CAS设置tail失败那么就需要enq
方法来初始化链表,并将当前节点维护到链表的尾部。
private Node enq(final Node node) {
//死循环,节点会自旋,直到成功加入队列,如果有并发进入这个方法,也不会有问题
for (;;) {
Node t = tail;
if (t == null) { // 如果队列为空,需要初始化一个空的Node节点,并让head和tail都指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {//将节点放入队列的尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
给出acquireQueued
的代码,这部分代码由AQS实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果前驱结点为头结点,那么只有当前结点在等待获取锁,那么就可以再次尝试获取锁,如果获取成功就可以反悔了
if (p == head && tryAcquire(arg)) {
//获取锁成功后释放结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断当前结点是否被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这部分代码实际也是一个死循环,当有一个结点进入后会一直自旋,知道成功获得锁。如果在等待的过程中,线程被中断,则不会响应,但是会标记一下,当该结点的线程获得锁后,再对线程进行中断处理。
通过两幅图片介绍一下acquireQueued方法的两种情况
情况一,队列中只有头结点和当前结点
情况二,当前结点不是队列中除头结点外的唯一结点
下面我们详细介绍两个方法,自旋的两个重要函数shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前驱结点的状态为SIGNAL,那么当前结点可以park即让其等待
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {//前驱结点的状态为cancle
//把所有状态为cancle的结点全部跳过,即从队列中删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将前驱结点的状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//上面函数返回值为真,才能进入这个函数,表示当前结点可以被park,并且再检查线程是否被中断,并清除中断状态,暂时不做响应
LockSupport.park(this);
return Thread.interrupted();
}
目前进行到这里加锁还剩最后一步,即对线程中断的处理,如果线程被中断,并且获得了锁,那么会调用方法selfInterrupt
来进行中断处理。
static void selfInterrupt() {
//再次调用线程中段方法
Thread.currentThread().interrupt();
}
释放锁代码思路
经过上面的折磨,终于走到了释放锁部分的代码,这部分代码相对于加锁部分要简单很多。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//尝试释放锁,释放成功那么需要将其他等待中的结点唤醒
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果调用释放锁的线程和占有锁的线程不是同一个,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//由于锁的可重入,只有当state的值为0的时候才能真正释放锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//修改State的值
setState(c);
return free;
}
释放锁部分的代码实际上很简单,由于只有获得锁得线程才能够释放锁,因此,如果调用释放锁的线程和占有锁的线程不是同一个,则需要抛出异常。另外一个需要注意的点就是,由于锁的可重入性,我们使用了state的值来表示锁的重入次数,因此,只有当state的值变为0,我们才能够真正的释放锁,但是,无论是否释放了锁,我们都需要将state的值修改。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}