jdk1.8 J.U.C并发源码阅读------AQS之独占锁的获取与释放

时间:2022-12-20 17:57:58

一、继承关系

since1.5
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
继承自AbstractQueuedSynchronizer抽象类。

二、成员变量

private transient volatile Node head;//CLH队列首节点
private transient volatile Node tail;//CLH队列尾节点
private volatile int state;//锁的占用次数

三、内部类

CLH队列(链表结构的队列)的node的数据结构:

static final class Node {
//标志Node的状态:独占状态
static final Node SHARED = new Node();
//共享状态
static final Node EXCLUSIVE = null;

//因为超时或者中断,node会被设置成取消状态,被取消的节点时不会参与到竞争中的,会一直保持取消状态不会转变为其他状态;
static final int CANCELLED = 1;
//该节点的后继节点被阻塞,当前节点释放锁或者取消的时候(cancelAcquire)需要唤醒后继者。
static final int SIGNAL = -1;
//CONDITION队列中的状态,CLH队列中节点没有该状态,当将一个node从CONDITION队列中transfer到CLH队列中时,状态由CONDITION转换成0
static final int CONDITION = -2;
//该状态表示下一次节点如果是Shared的,则无条件获取锁。
static final int PROPAGATE = -3;


//当一个新的node在CLH队列中被创建时初始化为0,在CONDITION队列中创建时被初始化为CONDITION状态
volatile int waitStatus;

//队列中的前驱节点
volatile Node prev;


//next连接指向后继节点,当前节点释放锁时,需要唤醒它后面第一个非cancelled状态的节点。
//当一个节点入队时,直接添加到队尾,在acquireQueue中调用shouldParkAfterFailedAcqurie时修改前一个节点的状态,若前一个节点是cancelled则直接删除前驱节点,调整prev指向非cancelled的前驱;若前驱节点的状态是0,-3时,调整状态为signal
//在enq方法中,只有当成功的将tail指针指向新的尾节点时,才给之前的尾节点设置next的值,因此,当一个节点的next指针是null时,并不意味着该节点是尾节点。
//cancelled状态的node的next指向该节点自身而不是null。
volatile Node next;

//当前线程
volatile Thread thread;

//CONDITION队列中指向下一个node的指针,CLH队列中不使用
Node nextWaiter;


final boolean isShared() {
return nextWaiter == SHARED;
}

//返回前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}


四、方法说明

获取锁有三种方式: (1)acquire:以独占的模式获取一个锁,忽略中断,获取成功返回true,否则将该线程加入等待队列
public final void acquire(int arg) {
//tryAcquire尝试获得该arg,失败则先addWaiter(Node.EXCLUSIVE)加入等待队列,然后acquireQueued。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中断当前线程
selfInterrupt();
}
//tryAcquire(arg):尝试获得该arg
//模板方法:由子类自己实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//mode:设置该节点是共享的还是独占的, Node.EXCLUSIVE for exclusive, Node.SHARED for shared
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//pred指向尾节点
Node pred = tail;
//尾节点不为null
if (pred != null) {
//新node的前驱结点指向尾节点
node.prev = pred;
//compareAndSetTail(pred, node):若等待队列的尾节点指向pred,则将尾节点指针指向node
if (compareAndSetTail(pred, node)) {
//修改AQS的尾节点成功,则将原先尾节点的next指向新节点
pred.next = node;
//返回当前节点
return node;
}
}
//pred==null或者修改AQS尾节点失败,则进入enq,用一个for循环修改AQS的尾指针,直至成功
enq(node);
return node;
}

private Node enq(final Node node) {
//循环,原因:可能同时存在多个线程修改tail指针。
for (;;) {
//t指向队列的tail
Node t = tail;
if (t == null) { // Must initialize
//尾节点为null,则队列为空,需要初始化队列
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将node的prev指针指向AQS的tail
node.prev = t;
//如果队列的tail==t,则将tail指向node
if (compareAndSetTail(t, node)) {
//原先的尾节点的next指向新添加的node
t.next = node;
//修改成功,返回原先的尾节点,失败则进行下一次循环
return t;
}
}
}
}
//compareAndSetHead和compareAndSetTail都是调用unsafe对象的compareAndSwapObject实现的。
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
//acquireQueued:忽略中断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的前驱节点p
final Node p = node.predecessor();
//前驱p是首节点,则当前线程尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取成功,则将当前节点设置成首节点
setHead(node);
//删除前驱的next引用
p.next = null; // help GC
failed = false;
//返回中断标志
return interrupted;
}
//node的前驱节点p不是首节点head,则通过shouldParkAfterFailedAcquire来判断当前node的线程是否应该挂起park
if (shouldParkAfterFailedAcquire(p, node) &&
//应该park,则调用parkAndCheckInterrupt忽略中断进行挂起
parkAndCheckInterrupt())
interrupted = true;
//当该线程被唤醒时,尝试再次判断前驱节点p是否为head,如果是则再次尝试获取锁,这样做的目的是为了维持FIFO的顺序
//采用阻塞park和唤醒unpark的方式进行自旋式检测获取锁
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHead(Node node) {
head = node;
//thread已经获得了锁,取消Node中指向thread的引用
node.thread = null;
//前驱设置为null
node.prev = null;
}
//shouldParkAfterFailedAcquire:当前节点获取锁失败时,是否应该进入阻塞状态
//(1)前驱节点的waitStatus==SINGNAL(-1):则说明该节点需要继续等待直至被前驱节点唤醒,返回true.
//(2)前驱节点的waitStatus==cancelled(1):则说明前驱节点曾经发生过中断或者超时,则前驱节点是一个无效节点,继续向前寻找一个node,该node的waitStatus<0,同时将waitStatus>0(cancelled)的节点删除,返回false(重新检测)。若新的前驱节点是head且tryAcquire获取成功,则该线程成功获取到锁。
//(3)前驱节点的waitStatus==-3或0(PROPAGATE):共享锁向后传播,因此需要将前驱节点的waitStatus修改成signal,然后返回false,重新检测前驱节点;0,表示前驱节点是队列尾节点,该节点新添加进来,修改为signal表示原先尾节点后面还有需要唤醒的节点(该节点)。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//ws:前驱节点的状态
int ws = pred.waitStatus;
//如果前驱节点是SIGNAL状态,则返回true,表示后继节点node应该被阻塞park
if (ws == Node.SIGNAL)
return true;
//ws>0:cancelled,前驱节点是一个超时或者中断被取消的Node
if (ws > 0) {
//循环往前找第一个不是cancelled状态的节点,同时删除中间状态是cancelled的节点,调整队列。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//状态是-3PROPAGATE或0(CONDITION状态只能在CONDITION队列中出现,不会出现在CLH队列中),则将pred前驱节点的状态设置成SIGNAL状态,表示pred节点被唤醒时,需要park它的后继者。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//parkAndCheckInterrupt:阻塞当前线程,并且返回中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//当acquireQueued的try中抛出异常,并且node没有成功获取锁时,需要通过cancelAcquire方法取消node节点
//主要功能:
//(1)重新设置node的prev指针(相当于删除了node的前面连续几个cancelled节点)
//(2)将node的状态设置为cancelled
//(3)如果node的prev不是首节点,并且node的next节点不是一个cancelled节点,则设置node的prev指向node的next节点;否则,node的prev节点head节点,则需要唤醒node的后继节点(unparkSuccessor)
//(4)将node的next指针指向自己
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

//如果node的前驱节点也是一个cancelled的节点,则修改node的prev指针,直至指向一个不是cancelled状态的node
//相当于删除了node到prev之前的状态为calcelled的前驱节点
//原因:若新的前驱节点是head节点,则需要在该node失效时唤醒node的后继节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

//predNext是一个cancelled的节点(可能是node)。
Node predNext = pred.next;

//直接将node的状态设置为cancelled
node.waitStatus = Node.CANCELLED;

//如果是尾节点,直接删除node
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 当pred不是首节点时,如果node的后继节点需要signal,则尝试将pred的状态设置成signal,并且将pred的next指针指向node的next节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//当pred是首节点时,需要唤醒node的后继节点。
unparkSuccessor(node);
}
//将cancelled的节点的next指针指向node自身
node.next = node; // help GC
}
}
//唤醒node的第一个非cancelled状态的node节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
//如果该节点是一个cancelled的节点,则不用管它;否则,将该节点状态置0清空,表示该节点已经获取过锁了,要出队。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

//node的后继节点有可能是一个cancelled的节点,而cancelled节点的next指针指向其自身。
//只能从tail开始寻找到node之间最靠近node的那一个非cancelled的节点,并唤醒它。
//此处不删除node到s之间的cancelled节点,只负责唤醒s节点,删除操作在shouldParkAfterFailedAquired中进行
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总结:
         首先调用tryAcquire尝试获取锁,获取失败则调用addWaiter,将该节点以独占的状态入队,加入到队列的末尾,由于可能存在多个线程调用CAS操作将节点加入到CLH队列末尾,因此enq方法以for循环的方式进行,保证该节点能添加到队列末尾。
然后调用acquireQueued在队列中获取锁,过程为:检查该节点node的前驱节点是否为head节点
如果是head节点并且调用tryAcquire获取锁成功,则setHead,将node作为当前队列的队首,原先的首节点出队。
否则,调用shouldParkAfterFailedAcquire查看当前节点的线程是否应该park,具体为:如果当前结点node的前一个节点的waitStatus为signal则表示node节点应该park等待前一个节点唤醒它,返回true;如果当前节点的小于0,即为cancelled,则应该删除node前面连续的cancelled节点,将node的prev指针指向靠近node的第一个非cancelled节点,返回false,重新判断前驱状态;若为0或者为PROPAGATE,则CAS将前驱状态改成signal返回false,重新判断。
如果应该park,则调用parkAndCheckInterrupt将当前线程park,否则就进行下次循环。
如果以上过程中抛出异常并且该节点还没有成功获取锁,则指向cancelAcquire,将当前节点的状态改为cancelled,该节点不在参与获取锁,同时删除该节点前面连续的cancelled状态的节点,若删除之后发现该节点的前一个节点是head,则应该调用unparkSuccessor唤醒后继。

(2)acquireInterruptibly:以独占的方式获取一个锁,如果该线程发生中断则抛出异常,将该node的waitstate设置为0,放弃等待锁。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//检查当前线程是否被中断,中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取锁,获取失败则调用doAcquireInterruptibly
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//doAcquireInterruptibly:获取锁,中断则抛出异常,修改node的waitstate=0
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//将该线程以独占的方式加入锁等待队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//自旋式获取锁,可能经过多个park和unpark的过程
for (;;) {
//p指向node的前驱节点
final Node p = node.predecessor();
//如果前驱节点p是head,则再次尝试获得锁
if (p == head && tryAcquire(arg)) {
//获取锁成功,修改head指针,指向当前节点。
//这里不用像setTail一样,因为等待队列是按照FIFO的顺序来获取锁的,因此不可能存在多个线程同时操作head,而tail可能同时存在多个node要添加到队列尾部,并发则需要循环添加至tail,直至成功。
setHead(node);
//原先的头结点出队,将内容设置为null,方便回收
p.next = null; // help GC
failed = false;
return;
}
//否则,前驱节点p不是head或者获取锁失败,则调用shouldParkAfterFailedAcquire查询该node是否应该阻塞park
if (shouldParkAfterFailedAcquire(p, node) &&
//应该阻塞,则调用parkAndCheckInterrupt进行中断式阻塞
parkAndCheckInterrupt())
//如果在阻塞时该线程发生异常,则在此处抛出异常。
throw new InterruptedException();
}
//如果抛出异常,则在finally中cancelAcquire(将node的waitstate修改)
} finally {
if (failed)
cancelAcquire(node);
}
}

private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//返回中断状态。
return Thread.interrupted();
}
//cancelAcquire:取消当前线程等待锁的状态
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
//找到前驱节点不是cancelled的,将node的prev指向它
//目的是为了判断新前驱如果是head,则该节点失效需要唤醒后继节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

//prev节点的后继节点,可能不是node,因为while循环中只是修改了prev指向,没有更改next指向
Node predNext = pred.next;

//当前节点的waiteStatus设置为CANCELLED,表示发生中断或者超时
node.waitStatus = Node.CANCELLED;
//如果node是尾节点,则直接删除并重新设置tail指向。
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {

int ws;
//前驱结点pred不是head,将pred的waitStatus设置成SIGNAL,并将pred的next指针指向node的下一个节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//尝试唤醒node的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
//获取当前node的waitSatus
int ws = node.waitStatus;
//<0,该节点等待锁
if (ws < 0)
//重置该node的状态为0
compareAndSetWaitStatus(node, ws, 0);
//s指向node的后继节点
Node s = node.next;
//如果没有后继node或者后继node是canceld,因为超时或者中断被取消
if (s == null || s.waitStatus > 0) {
s = null;
//从尾节点开始,寻找从s到tail之间最靠近s的waitStatus<0的nodek,将s指向它
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//s不为空,则有可以操作的后继节点
if (s != null)
//唤醒该节点后继节点的线程。
//该头结点的线程已经获得过锁并且执行完成了,还唤醒了后继节点,要将该节点从队列中移除(具体操作在该节点后继节点acquiredQueue的方法中实现更新表头)
LockSupport.unpark(s.thread);
}

(3)tryAcquireNanos:尝试以独占的方式获取锁,如果发生中断和超时则停止
 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//发生中断,停止
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取独占锁,失败则调用doAcquireNanos
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//截止时间
final long deadline = System.nanoTime() + nanosTimeout;
//创建一个Node,独占模式
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//前驱节点
final Node p = node.predecessor();
//如果p是head,并且tryAcquire获取成功,则该node获取锁成功
if (p == head && tryAcquire(arg)) {
//设置新node为head节点,node之前的节点都出队了
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//还剩多少时间
nanosTimeout = deadline - System.nanoTime();
//超时
if (nanosTimeout <= 0L)
return false;
//shouldParkAfterFailedAcquire,查看是否需要阻塞该线程,如果需要则
if (shouldParkAfterFailedAcquire(p, node) &&
//查看剩余时间大于spinForTimeoutThreshold,如果大于则阻塞当前线程,否则,剩余时间很短,直接查看是否能成功获取锁(为了超时时间的准确性)
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//如果该线程发生了中断,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
//如果获取失败,则cancelAcquire,该节点要么超时,要么抛出异常了
if (failed)
cancelAcquire(node);
}
}

释放独占锁的过程分析:
总结: 调用tryRelease释放锁,若成功了,则查看head的状态,若waitStatus为0,则表示CLH队列中没有后继节点了,则不用唤醒,否则,需要调用unparkSuccessor唤醒后继节点,unparkSuccessor唤醒后继节点的原理是:找到node的后面第一个非cancelled状态的节点进行唤醒。
public final boolean release(int arg) {
//tryRelease,模板方法,子类实现。尝试释放锁,释放成功,则需要唤醒后继节点
if (tryRelease(arg)) {
Node h = head;
//h.waitStatus为0说明CLH队列中已经没有成员了,head是最后一个节点
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
//waitStatus<0,重置node的状态为0,该节点已经获取过锁了。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
//s节点的状态是cancelled,需要寻找s后面第一个非cancelled状态的节点唤醒
//从队尾开始向前找的原因:
//next节点不一定指向它的后继节点,有可能指向其自身,当该节点的状态为cancelled的时候,该节点的next指针指向自身。
//之所以cancelled状态节点的next指针指向自身,是为了方便该方法isOnSyncQueue(Node node)判断,若next不为空则说明该Node一定在CLH队列中。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程,后继线程在AcquireQueue中被唤醒。
//删除node到s之间的cancelled状态的线程的工作,在shouldParkAfterFailedAcquire中进行,当前驱状态为cancelled时,
//寻找第一个非cancelled状态的节点,调整s的prev指向,如果前驱是head,并且tryAcquire成功,则s成功获取到锁了。
LockSupport.unpark(s.thread);
}