官方文档如是说
AQS提供了一个框架,一个FIFO的等待队列和一个代表状态的int值。子类需要定义改变这个状态的protected方法、定义什么状态表示获取到状态以及释放状态。该类中其中方法提供所有入队和阻塞机制。子类可以保存其他状态,但是需要使用getState、setState和compareAndSetState方法来原子性地读取更新状态值。
AQS用于一群线程为了得到某个资源,但是同一时刻,只能有部分线程可以使用该资源,对于其他线程,会使用队列将其他线程入队,而一旦有一个线程使用完了资源,那么队列中的某个线程就会获得该资源的使用。而AQS中未对该资源做出明确说明,只是通过一个int的状态值表示资源的获取与释放。比如在ReentrantLock中,该资源某一线程持有的锁的个数,当某一线程获得了这把锁,就把状态值置为1,释放则将状态值减1。那么就会得出状态0表示线程可以使用这把锁,而状态大于0则表示其他线程不可以使用这把锁。由于ReentrantLock是可以重入的,所以这儿状态值是可以大于1,代表拥有该锁的线程又再一次获得该锁,这些留在下面再讲。
一个状态
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS中表示状态的值是一个valotile类型的,实现了内存可见性,一旦更新,那么别的线程就可以立即看到了。而对于更新,使用了Unsafe类的CAS机制实现原子性。所以关于state的更新以及读取就不需要子类关注了,只需要知道调用这几个方法即可。
队列
AQS支持两种模式,一种是独占式,另一种是共享式。
当使用独占式时,当一个线程得到了资源,那么其他尝试获取资源的线程将不会成功。
当使用共享式时,当一个线程得到了资源,那么其他尝试获取资源的线程可能(也有可能不)会成功。
不同模式的等待线程使用相同的队列,都在同一个队列中入队。所以队列的每一个节点需要区分是共享模式还是独占模式。
通常,子类只会支持一种模式,但是ReadWriteLock类使用了两种模式。只支持一种模式的子类不需要重写另一种模式的方法。
下面是队列中节点的定义:
static final class Node {
//表示共享模式
static final Node SHARED = new Node();
//表示独占模式
static final Node EXCLUSIVE = null;
//表明线程已被取消
static final int CANCELLED = 1;
//后继节点需要被唤醒
static final int SIGNAL = -1;
//表明线程在等待某个条件的发生
static final int CONDITION = -2;
//表明下一次acquireShared时需要无条件地传播
static final int PROPAGATE = -3;
//该线程的状态
volatile int waitStatus;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当前线程
volatile Thread thread;
//下一个等待条件的节点或共享节点
Node nextWaiter;
/**
* 返回该节点是否是共享的
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回该节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
从上面可以看到AQS的队列是一个双向链表结构的队列。而在AQS中也保存了一头一尾两个变量,这样入队和出队都很方便。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
独占模式的例子
ReentrantLock是可重入的锁,其内部使用的就是独占模式的AQS,那么下面就以ReentrantLock为例,说明一下ReentrantLock的实现原理。
ReentrantLock的使用
ReentrantLock的使用有一定的格式,如下:
class X {
* private final ReentrantLock lock = new ReentrantLock();
* // ...
*
* public void m() {
* lock.lock(); // block until condition holds
* try {
* // ... method body
* } finally {
* lock.unlock()
* }
* }
* }
lock方法用于尝试获取锁,一旦得到锁后就可以进行操作了,否则线程阻塞;最后处理完事情后调用unlock释放锁。下面就从这两个方法看起。
AQS的状态在ReentrantLock代表什么?
因为ReentrantLock是可重入的,那么AQS的状态在ReentrantLock代表的是锁的持有数。虽然一次只能有一个线程持有该锁,但是由于可重入的原因,导致持有线程可以获得多把锁。
lock方法
首先看lock方法,代码如下:
public void lock() {
sync.lock();
}
可以看到lock方法调用sync的lock方法,那么这个sync是个什么呢? Sync是ReentrantLock的一个内部类,并且在ReentrantLock初始化时创建。下面是Sync类的定义:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
可以看到Sync是一个抽象类,其抽象方法是lock方法。该类有两个实现类,一个是NonfairSync,一个是FairSync。这两者有什么区别呢?这两者的区别在于获取不到锁时的表现,NonfairSync是抢占式的,虽然是后来的,但还是首先尝试一把,万一就获得了锁呢?FairSync则是个规规矩矩的好学生,知道这会有线程占有了锁,那么就老老实实地入队了。这些在下面的代码会分析到。
知道了有两种不同的Sync之后,那么ReentrantLock是如何指定使用哪一种Sync类的呢?答案在构造器中,如下:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从代码中可以看到,默认实现是NonfairSync。那么下面我们就先看NonfairSync的lock具体实现:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
从上面代码可以看到,首先是尝试将状态从0置为1,即从无锁变有锁,如果成功了,那么说明该线程就得到了这把锁;如果失败了,就会调用acquire(1)获取一把锁。
其中acquire方法是父类AQS中的方法,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法以独享模式获取资源,忽略Interrupt。其中tryAcquire方法为尝试获取资源,如果不能获取,那么调用acquireQueued方法将该线程放入队列中。首先看tryAcquire方法,该方法在AQS是个空实现,每个实现独占模式的类需要自己实现。下面看NonFairSync的tryAcquire实现:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
可以看到NonFairSync调用的是父类的nonfairTryAcquire方法,下面是该方法的实现:
final boolean nonfairTryAcquire(int acquires) {
//得到本线程
final Thread current = Thread.currentThread();
//得到状态
int c = getState();
//没有线程持有锁
if (c == 0) {
//可以将状态从0更新为1,说明本线程成功获取到了锁,返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果本线程和目前持有锁的线程相同,更新持有锁的数量,返回true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//其余情况,返回false
return false;
}
从上面代码可以看到有几种情况:
- 目前没有线程持有锁,并且该线程成功更新状态,返回true
- 如果本线程和当前持有锁的线程相同,更新状态,返回true
- 其余情况,目前有其他线程获得了锁,返回false
在acquire方法中,如果该线程成功获得了锁,那么返回false,由于短路,不会执行后面的方法了;但如果获取锁失败,那么就会执行acquiredQueued方法,下面首先看其第一个参数addWaiter方法,addWaiter方法根据传入的模式创建节点并将其入队。
//在独占模式下,mode值为null
private Node addWaiter(Node mode) {
//根据线程以及模式创建节点
Node node = new Node(Thread.currentThread(), mode);
//得到队列尾端
Node pred = tail;
//如果队列不为空
if (pred != null) {
node.prev = pred;
//更新队列尾端
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//队列为空的情况或不能更新队列尾端(说明有别的线程入队成功了)
enq(node);
return node;
}
从上面代码可以看到,在队列为空的情况或者compareAndSetTail失败的情况下,调用enq方法将节点插入队列中并返回插入节点的前驱节点,下面是enq方法:
private Node enq(final Node node) {
//死循环
for (;;) {
Node t = tail;
//队列为空,新建队头对尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
//一直尝试将节点加入队尾,直至成功
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法实现了加入节点的线程的自旋,直至加入队列为止。
在enq方法中有一个奇怪的地方,是在队列为空时,没有以当前节点作为队列头节点,而是创建了一个新节点作为头节点,结构如下图:
可以看到加入的那个结点就是黄色的Head节点,该节点是个空节点,然后才是将后面的节点加入队列。
一旦将节点加入了队列中,接下来还需要做的事情就是挂起线程,下面再看acquireQueued方法,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循环
for (;;) {
final Node p = node.predecessor();
//如果该节点的前驱节点是头节点(空节点)并且成功获取到了资源,返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//需要根据前驱节点查看是否需要挂起自己
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) //异常,取消获取
cancelAcquire(node);
}
}
从上面可以看到,也是进入一个死循环,然后如果该节点的前驱节点是头节点并且成功获取到了资源,那么将其出队并返回。否则调用shouldParkAfterFailedAcquire方法判断该线程是否需要阻塞,如果需要阻塞,还会再调用parkAndCheckInterrupt()方法阻塞线程以及判断线程是否被Inerrupt了。先看shouldParkAfterFailedAcquire方法,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//得到前驱节点的状态
int ws = pred.waitStatus;
//如果处于SIGNAL,返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
从代码中可以看出,只有当前一个节点为SIGNAL状态时,才可以将自身挂起,而挂起的parkAndCheckInterrput()如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //挂起线程
return Thread.interrupted();
}
上面是没有获取到的情况,需要挂起线程,那么如果获得了资源,那么会调用setHead方法,下面是setHead方法
private void setHead(Node node) {
//设置成头节点
head = node;
//置空
node.thread = null;
node.prev = null;
}
从上面的代码可以看到,setHead方法就是将节点设置为队列中的空节点,这里将线程置为null了。那么节点之前关联的线程呢?
已经在tryAcquire方法中设置了拥有AQS的线程了。
FairSync的lock方法
到上面为止,就分析完了NonfairSync的lock方法,下面再看下FairSync的lock方法,比较一下区别:
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
//获得当前线程
final Thread current = Thread.currentThread();
//状态数
int c = getState();
//没有线程拥有锁
if (c == 0) {
//如果队列为空并且可以成功更新状态
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果本线程与持有锁的线程相同
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
从上面代码中可以看出,FairSync的lock方法直接调用了acquire方法,与NonfairSync相比,缺少了第一步的CAS更新状态的操作。同样再看其下的tryAcquire方法。在tryAcquire方法中可以看出区别在于调用了hasQueuedPredecessors()方法检查队列中是否有前驱节点。
可以看到FairSync是严格遵循队列中的等待规则。
unlock方法
unlock方法如下:
public void unlock() {
sync.release(1);
}
release方法是AQS的实现,如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //空节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release方法返回tryRelease方法的返回值,tryRelease需要实现独占模式的子类自己实现,可以看到如果成功释放了,那么调用unparkSuccessor对后继节点唤醒。下面先看Sync的tryRelease方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果持有锁的线程不是该线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果状态为0,说明没有线程占有锁了,资源释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
从上面的代码可以看出,tryRelease方法将状态减去参数,这里就是1,因为一次unlock就解除一次对锁的占有。一旦没有线程占有锁了,则说明锁被彻底释放了。
一旦没有线程获取锁了,那么需要让队列中等待的线程去获取锁。下面是unparkSuccessor()方法:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//下一个节点
Node s = node.next;
//如果下一个节点为null或者状态为1,即取消了
if (s == null || s.waitStatus > 0) {
s = null;
//从队列后出发,寻找最后一个不为null且状态为负的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果节点不为null,唤醒该节点
if (s != null)
LockSupport.unpark(s.thread);
}
之前我们知道了队列中的第一个节点是个空节点,这个操作就是遍历其后节点找到第一个需要唤醒的节点。当一个节点被唤醒后,其lock方法就会被释放,而关键的是acquireQueued方法中死循环会继续执行,下面再看一下该方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
由于该线程被唤醒了,会再进入循环,此时该节点的前驱节点就变成了头节点,并且如果此时tryAcquire方法返回true,就会拥有锁了。
那么有一个问题,此时tryAcquire()方法会返回false吗?
答案是会的,因为有NonfairSync的存在,所以如果恰好有一个线程抢占并且成功了,那么这个节点就有可能需要再次被挂起了。
总结
这篇文章,从AQS类的内部表示资源的状态以及队列入手,然后再结合ReentrantLock源码分析了独占模式的实现。当使用AQS的独占模式时,需要重写几个关键方法,tryAcquire、tryRelease等方法用于获取资源和释放资源;另外需要一个状态值抽象出一个状态。
另外需要注意的就是AQS内部的队列结构,其头节点是个空的实现,每个结点的状态变化是如何进行的。
本篇博客参考深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)