刚刚看完了并发实践这本书,算是理论具备了,看到了AQS的介绍,再看看源码,发现要想把并发理解透还是很难得,花了几个小时细分析了一下把可能出现的场景尽可能的往代码中去套,还是有些收获,但是真的很费脑,还是对多线程的理解太浅了,不多说了,直接上代码吧。
这段代码不是为跑通,只是把AQS,ReentrantLock中的部分源码合并到了一起,便于理解。
package com.yb.interview.concurrent; import java.util.concurrent.locks.LockSupport; public class AQSSourceStudy { abstract static class AQS {
/**
* 这个状态是有子类来维护的,AQS不会用这个状态做什么
*/
private volatile int state;
/**
* 队尾节点
*/
private volatile Node tail;
/**
* 可能情况
*/
private volatile Node head;
/**
* 独占线程
*/
private Thread exclusiveOwnerThread; /**
* 由子类实现
* 判断当前线程是否需要排队
*/
abstract boolean tryAcquire(int i); public int getState() {
return state;
} public void setState(int state) {
this.state = state;
} /**
* 主方法
* 可能的情况
* 当前状态可以直接运行
* 当前状态要放入队列里等待
* 状态->子类获取
* 过程,尽可能的不要去阻塞,循环多次,竞争多次
* 创建节点
* 节点入队,队尾
* 判断新节点的前一个节点的状态,更新,前一个节点,因为在入队的过程中每个节点的状态是动的
* 最后,阻塞当前线程
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 中断状态传播
// 实时或者将来阻塞,抛中断异常
selfInterrupt();
} /**
* 当有新节点入队时,循环的把新节点关联到一个有效节点的后面
* 然后,阻塞这个节点的线程(当前线程)
*/
private boolean acquireQueued(Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
// 新节点的前个节点是头结点,如果头结点的线程释放,新节点接可以直接执行
// 所有不要着急阻塞,在判断一次,头结点释放没有,如果头结点释放,新节点不阻塞,把新节点设为头结点
// 当新节点没有排队直接运行了,之后要将节点标记为无效 cancelAcquire
if (p == head && tryAcquire(arg)) {
// 想了很久这段代码发生的情况
// 这段代码发生的情况
// 1.node在入队列时,有不同的线程在获得了锁,且队列中没有节点
// 2.当执行到这里再次tryAcquire之前,之前释放了锁
// 3.这时hasQueuedPredecessors中的判断,头结点的后一个节点,是新建的这个节点,满足s.thread==Thread.currentThread(不考虑这时有其他线程进入,或者进入无效)
// 满足了tryAcquire返回true的情况
// 将头结点改为新节点
/****
* head tail
* | |
* | |
* ---------- ---------
* nullNode newNode
* --------- ----------
* next=newNode prev=nullNode
* prev=null next=null
* ------- ----------
*
* 改完后
*
* head tail
* | |
* | |
* --------- ---------
* nullNode newNode
* --------- ---------
* next=newNode prev=nullNode
* prev=null next=null
* --------- ----------
* */ setHead(node);
p.next = null;
failed = false;
return interrupted;
}
// 之前的节点不是正在执行线程的节点,调整位置和状态再阻塞
// 在线程解除阻塞后,使者节点失效
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 节点解除阻塞后,可能是中断或者超时
// 非unlock的解锁
cancelAcquire(node);
}
} private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
// 那个空的节点会保证终止
while (pred.waitStatus > 0)
// 将节点的prev关联到最近的有效节点
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 任何情况都执行的
node.waitStatus = Node.CANCELLED; // 如果取消的节点是队尾节点,并且将前节点设为队尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// cancel的节点和cancel之前的无效节点会移出队列
compareAndSetNext(pred, predNext, null);
} else {
// 如果不是队尾节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// prev->node->next 改为 prev->next
compareAndSetNext(pred, predNext, next);
} else {
// 判断锁定的状态
// 如果前节点是头结点,或者不是SIGNAL状态并且无法设置为SIGNAL状态
// 总结,取消一个节点是,要保证这个节点能被释放,要不通过前节点通知,在锁锁,对应release
unparkSuccessor(node);
} node.next = node; // help GC
}
} private void unparkSuccessor(Node node) {
// 解锁节点的线程
// 当node时头节点时,是当前获取线程释放的炒作
// 不是偷节点
int ws = node.waitStatus;
if (ws < 0)
// 不用再去通知下个节点了,即将释放node了
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从队尾向前找到最前有效的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); } private void compareAndSetNext(Node pred, Node predNext, Object o) { } private boolean parkAndCheckInterrupt() {
// 阻塞
LockSupport.park(this);
// 当前前程标记中断
return Thread.interrupted();
} private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前节点是需要被通知的,前节点正在被阻塞,阻塞当先线程
if (ws == Node.SIGNAL)
return true;
// 如果前节点是无效的,找到最近的一个有效节点,并关联,返回,在外部调用方法中会再次调用这个方法
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 这是个切断调用链的过程
pred.next = node;
} else {
// 更新前节点的状态,释放时通知新节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
} /**
* 创建节点
* 节点入队
*
* @return 新节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 之前有节点在队列中
if (pred != null) {
node.prev = pred;
// 直接修改队尾,不成功要进入接下类的循环,循环中也有类型的判断,这里添加会减少一些逻辑(这样说可能是理解的有偏差)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
} /**
* 节点入队
* 循环,直到把新节点放到队尾,在多线程中这个过程是不确定的
*/
private Node enq(Node node) {
for (; ; ) {
Node t = tail;
// Must initialize
// 队尾没值,新节点是第一个入队的节点,创建一个空的节点,头尾都指向这个空节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
} /**
* 字面理解,是否有已经排队的线程
* 实际意义,有重入锁的情况,在这里要考虑到
* 没有节点在排队的情况,头结点与未节点是相同的
* 判断重入,当前线程是头结点的线程.
*/
protected boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//为什么是头结点的线程,而不是exclusiveOwnerThread,因为只有在
// 当前队列里没有值得时候才回设置独占线程,如果是通过节点释放的线
// 程还会和节点绑定,不会映射到exclusiveOwnerThread
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
} public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 在独占锁的时候,waitStatus只能为0 -1 -2 -3
// 这个里不为0代表头节点是空节点
// 空节点不需要释放
// 头节点是释放锁的时候,最先被考虑的
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
} protected abstract boolean tryRelease(int arg); public void setHead(Node head) {
this.head = head;
} private boolean compareAndSetHead(Node node) {
return (true || false);
} private boolean compareAndSetTail(Node pred, Node node) {
return (true || false);
} protected void selfInterrupt() {
Thread.currentThread().interrupt();
} /**
* CAS更新队列状态,CAS的问题在其他的机会介绍
*/
boolean compareAndSetState(int o, int n) {
return (false || true);
} /**
* 独占线程标记改为指定线程
*/
void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
} /**
* 返回独占线程
*/
Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
} // 修改节点的状态
private boolean compareAndSetWaitStatus(Node pred, int ws, int signal) {
return (true || false);
} static class Node { public int waitStatus; Node() {
} /**
* @param thread
* @param mode SHARED or EXCLUSIVE
*/
Node(Thread thread, Node mode) {
this.thread = Thread.currentThread();
this.mode = mode;
} // 共享模式标记
static final Node SHARED = new Node();
// 独占模式标记
static final Node EXCLUSIVE = null; // 节点被取消,因为超时或者中断
static final int CANCELLED = 1;
// next被阻塞,当节点释放时,notice next
static final int SIGNAL = -1;
// 在条件队列中,等待某个条件被阻塞
static final int CONDITION = -2;
// 节点在共享模式下,可以传播锁
static final int PROPAGATE = -3; volatile Node next;
volatile Node prev;
Node mode; public Thread thread; public Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
} } /**
* 这是一个独占锁的实现,从ReentrantLock中粘贴出来的部分代码
*/
class SYC extends AQS { public void lock() {
acquire(1);
} public void unlock() {
release(1);
} protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果当前的状态
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
} protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
} }
}