并发基础里主要明白下CAS和AQS吧
CAS:Compare And Swap 比较然后交换
AQS:AbstractqueuedSynchronizer抽象的队列式同步器
一、CAS
CAS在很多无锁的并发里使用。无锁并发的意思就是,用不加锁的方式实现并发操作,这一波操作是不是很666啊
通常加锁操作(无论是synchronized还是Lock)都是比较悲观的认为被锁住的部分(无论是对象、方法还是代码块)每一次操作都会引起资源竞争。
而CAS就比较乐观了,他认为对资源的访问是没有冲突的,既然没有冲突就应该被友好的访问。所有访问的线程走到这一块的时候都不需要停下来等待其它线程释放资源。
那CAS是哪儿来的乐观呢?就是通过比较然后交换的思想。
CAS算法:包含三个参数(V,E,N),其中,V表示要更新的变量,E表示预期的变量,N表示新值。
当且仅当V值等于E值时,将V值更新为N值。并返回V值。
对一个线程L来讲,他要去更新一个值的时候(V),他首先知道这个值应该等于E,更新之前他先去比较一下,当确认V值等于E了,他就认为这个值没有被其他线程修改过,蓝后,再对V进行更新,更新为N值。
CAS的应用之一是jdk的atomic包。比如AtomicInteger
compareAndSet是主要使用的方法
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
unsafe的 compareAndSwapInt方法是native的,会调用底层操作系统完成对值的修改
CAS会遇到一个无法解决的问题,即ABA问题。即V值被多次修改后值仍为最初值。
意思是说对于之前的线程L来说,他去更新V值的时候。确认V值等于E不能完全确认这个值没有被其它线程修改过。
比如线程L1将V值改成V1,线程L2将V值又改回了V值。
V值经历了两次修改,但是对于L线程来说,这个值没有修改过。But,V值其实进行了多次更新。
我个人没有想到ABA可能会引发的问题,猜对于状态比较敏感的情景下应该不被允许吧。
对于ABA问题,解决方案是设置一个时间戳,每次修改成功后更新时间戳。
对于V值的预期值E和预期时间戳都满足时才对V值进行更新。
jdk有个
AtomicStampedReference类,其内部实现了一个Pair类,用来封装值和时间戳
1 private static class Pair<T> { 2 final T reference; 3 final int stamp; 4 private Pair(T reference, int stamp) { 5 this.reference = reference; 6 this.stamp = stamp; 7 } 8 static <T> Pair<T> of(T reference, int stamp) { 9 return new Pair<T>(reference, stamp); 10 } 11 }
二、AQS
AQS是java.util.concurrent.locks包下的AbstractQueuedSynchronizer类。
AQS核心是通过共享变量同步状态,变量的状态由子类维护,AQS框架主要做的是:
线程阻塞队列的维护
线程阻塞和唤醒
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
上图摘自jdk源码说明,是LCH队列锁的说明。LCH队列锁是一个FIFO队列,每个节点是一个等待线程,其前继节点(pre)释放锁之后就可以获得锁了。
AbstractQueuedSynchronizer通过内部类Node来维护这个队列。
AbstractQueuedSynchronizer成员变量维护一个Node类型的head,一个Node类型的tail以及状态state。维护对LCH队列的入队出队操作以及对state状态的修改。
其中,共享变量state的修改是通过unsafe的CAS来操作的。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* 使用当前线程和指定的模式创建Node节点,并对节点做入队操作
*
* @param mode模式 Node.EXCLUSIVE for exclusive(独占模式,只有一个线程能执行,比如ReetrantLock),
Node.SHARED for shared(共享模式,多个线程可同时执行,比如Semaphere/CountDownLatch)
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node =
new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred =
tail;
if (pred != null) {
node.
prev = pred;
if (compareAndSetTail(pred, node)) {
pred.
next = node;
return node;
}
}
enq(node)
;
return node;
}
其中,入队的操作如下
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t =
tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
else {
node.
prev = t;
if (compareAndSetTail(t, node)) {
t.
next = node;
return t;
}
}
}
}
AbstractQueuedSynchronizer主要有两个方法:acquire()和release()
acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lock和unlock方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h =
head;
if (h != null && h.waitStatus != 0)//唤醒后继节点
unparkSuccessor(h)
;
return true;
}
return false;
}
以下四个方法由子类去实现
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)