AQS解析与实战

时间:2022-10-06 15:03:29

前言

前段时间在面试,发现面试官都有问到同步器AQS的相关问题。AQS为Java中几乎所有的锁和同步器提供一个基础框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的几个核心点,谈谈对AbstractQueuedSynchronizer的理解,并实现一个自定义同步器。

AQS原理面试题的核心回答要点

  1. state 状态的维护。
  2. CLH队列
  3. ConditionObject通知
  4. 模板方法设计模式
  5. 独占与共享模式。
  6. 自定义同步器。
  7. AQS全家桶的一些延伸,如:ReentrantLock等。

AQS的类图结构

AQS全称是AbstractQueuedSynchronizer,即抽象同步队列。下面看一下AQS的类图结构:AQS解析与实战

为了方便下面几个关键点的理解,大家先熟悉一下AQS的类图结构

state 状态的维护


1. ​​在AQS中维持了一个单一的共享状态state,来实现同步器同步。看一下state的相关代码如下:​​

state源码


1. ​​      /**​​
2. ​​ * The synchronization state.​​
3. ​​ */​​
4. ​​ private volatile int state;​​
5.
6. ​​ /**​​
7. ​​ * Returns the current value of synchronization state.​​
8. ​​ * This operation has memory semantics of a {@code volatile} read.​​
9. ​​ * @return current state value​​
10. ​​ */​​
11. ​​ protected final int getState() {​​
12. ​​ return state;​​
13. ​​ }​​
14.
15. ​​ /**​​
16. ​​ * Sets the value of synchronization state.​​
17. ​​ * This operation has memory semantics of a {@code volatile} write.​​
18. ​​ * @param newState the new state value​​
19. ​​ */​​
20. ​​ protected final void setState(int newState) {​​
21. ​​ state = newState;​​
22. ​​ }​​
23.
24. ​​ /**​​
25. ​​ * Atomically sets synchronization state to the given updated​​
26. ​​ * value if the current state value equals the expected value.​​
27. ​​ * This operation has memory semantics of a {@code volatile} read​​
28. ​​ * and write.​​
29. ​​ *​​
30. ​​ * @param expect the expected value​​
31. ​​ * @param update the new value​​
32. ​​ * @return {@code true} if successful. False return indicates that the actual​​
33. ​​ * value was not equal to the expected value.​​
34. ​​ */​​
35. ​​ protected final boolean compareAndSetState(int expect, int update) {​​
36. ​​ // See below for intrinsics setup to support this​​
37. ​​ return unsafe.compareAndSwapInt(this, stateOffset, expect, update);​​
38. ​​ }​​

state 源码设计几个回答要点:

  • state用volatile修饰,保证多线程中的可见性。
  • getState()和setState()方法采用final修饰,限制AQS的子类重写它们两。
  • compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。

CLH队列

谈到CLH队列,我们结合以上state状态,先来看一下AQS原理图

AQS解析与实战

CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

Node节点

CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),condition队列的后续节点(nextWaiter)如下图:

AQS解析与实战

waitStatus几种状态状态:

AQS解析与实战

我们再看一下CLH队列入列以及出列的代码:

入列

CLH队列入列就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下:


1. ​​//构造Node​​
2. ​​private Node addWaiter(Node mode) {​​
3. ​​ Node node = new Node(Thread.currentThread(), mode);​​
4. ​​ // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)​​
5. ​​ Node pred = tail;​​
6. ​​ if (pred != null) {​​
7. ​​ node.prev = pred;​​
8. ​​ //CAS设置尾节点​​
9. ​​ if (compareAndSetTail(pred, node)) {​​
10. ​​ pred.next = node;​​
11. ​​ return node;​​
12. ​​ }​​
13. ​​ }​​
14. ​​ //多次尝试​​
15. ​​ enq(node);​​
16. ​​ return node;​​
17. ​​ }​​

由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点,enq方法如下:


1. ​​   private Node enq(final Node node) {​​
2. ​​ //死循环尝试,知道成功为止​​
3. ​​ for (;;) {​​
4. ​​ Node t = tail;​​
5. ​​ //tail 不存在,设置为首节点​​
6. ​​ if (t == null) { // Must initialize​​
7. ​​ if (compareAndSetHead(new Node()))​​
8. ​​ tail = head;​​
9. ​​ } else {​​
10. ​​ node.prev = t;​​
11. ​​ if (compareAndSetTail(t, node)) {​​
12. ​​ t.next = node;​​
13. ​​ return t;​​
14. ​​ }​​
15. ​​ }​​
16. ​​ }​​
17. ​​ }​​

出列

首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。可以看一下以下两段源码:


1. ​​  Node h = head;​​
2. ​​ if (h != null && h.waitStatus != 0)​​
3. ​​ unparkSuccessor(h);​​
4. ​​ private void unparkSuccessor(Node node) {​​
5. ​​ /*​​
6. ​​ * If status is negative (i.e., possibly needing signal) try​​
7. ​​ * to clear in anticipation of signalling. It is OK if this​​
8. ​​ * fails or if status is changed by waiting thread.​​
9. ​​ */​​
10. ​​ int ws = node.waitStatus;​​
11. ​​ if (ws < 0)​​
12. ​​ compareAndSetWaitStatus(node, ws, 0);​​
13.
14. ​​ /*​​
15. ​​ * Thread to unpark is held in successor, which is normally​​
16. ​​ * just the next node. But if cancelled or apparently null,​​
17. ​​ * traverse backwards from tail to find the actual​​
18. ​​ * non-cancelled successor.​​
19. ​​ */​​
20. ​​ Node s = node.next;​​
21. ​​ if (s == null || s.waitStatus > 0) {​​
22. ​​ s = null;​​
23. ​​ for (Node t = tail; t != null && t != node; t = t.prev)​​
24. ​​ if (t.waitStatus <= 0)​​
25. ​​ s = t;​​
26. ​​ }​​
27. ​​ if (s != null)​​
28. ​​ LockSupport.unpark(s.thread);​​
29. ​​ }​​

CLH核心几个回答要点

  • 双向链表入列出列
  • CAS算法设置尾节点+死循环自旋。

CAS算法,可以看一下我工作实战中仿造CAS算法解决并发问题的实现

ConditionObject

ConditionObject简介

我们都知道,synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持

Condition队列与CLH队列的那些事

我们先来看一下图:

AQS解析与实战

ConditionObject队列与CLH队列的爱恨情仇:

  • 调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点。
  • 线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒。
  • 当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒。

区别:

  • ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。

独占与共享模式。

AQS支持两种同步模式:独占式和共享式。

独占式

同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。

公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。

非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。

acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:

  • acquire(long arg)方法


1. ​​  public final void acquire(long arg) {​​
2. ​​ if (!tryAcquire(arg) &&​​
3. ​​ acquireQueued(addWaiter(Node.EXCLUSIVE), arg))​​
4. ​​ selfInterrupt();​​
5. ​​ }​​
  • addWaiter方法


6. ​​//构造Node​​
7. ​​private Node addWaiter(Node mode) {​​
8. ​​ Node node = new Node(Thread.currentThread(), mode);​​
9. ​​ // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)​​
10. ​​ Node pred = tail;​​
11. ​​ if (pred != null) {​​
12. ​​ node.prev = pred;​​
13. ​​ //CAS设置尾节点​​
14. ​​ if (compareAndSetTail(pred, node)) {​​
15. ​​ pred.next = node;​​
16. ​​ return node;​​
17. ​​ }​​
18. ​​ }​​
19. ​​ //多次尝试​​
20. ​​ enq(node);​​
21. ​​ return node;​​
22. ​​ }​​
  • acquireQueued(final Node node, long arg)方法


23. ​​ final boolean acquireQueued(final Node node, long arg) {​​
24. ​​ boolean failed = true;​​
25. ​​ try {​​
26. ​​ boolean interrupted = false;​​
27. ​​ for (;;) {​​
28. ​​ final Node p = node.predecessor();​​
29. ​​ if (p == head && tryAcquire(arg)) {​​
30. ​​ setHead(node);​​
31. ​​ p.next = null; // help GC​​
32. ​​ failed = false;​​
33. ​​ return interrupted;​​
34. ​​ }​​
35. ​​ if (shouldParkAfterFailedAcquire(p, node) &&​​
36. ​​ parkAndCheckInterrupt())​​
37. ​​ interrupted = true;​​
38. ​​ }​​
39. ​​ } finally {​​
40. ​​ if (failed)​​
41. ​​ cancelAcquire(node);​​
42. ​​ }​​
43. ​​ }​​
  • selfInterrupt()方法


44. ​​   static void selfInterrupt() {​​
45. ​​ Thread.currentThread().interrupt();​​
46. ​​ }​​

结合源代码,可得acquire(int arg)方法流程图,如下:

AQS解析与实战

共享式

多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。

acquireShared(long arg)是共享式获取同步状态的方法,可以看一下源码:


1. ​​  public final void acquireShared(long arg) {​​
2. ​​ if (tryAcquireShared(arg) < 0)​​
3. ​​ doAcquireShared(arg);​​
4. ​​ }​​

由上可得,先调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败,调用doAcquireShared(int arg)自旋方式获取同步状态,方法源码如下:


1. ​​ private void doAcquireShared(long arg) {​​
2. ​​ final Node node = addWaiter(Node.SHARED);​​
3. ​​ boolean failed = true;​​
4. ​​ try {​​
5. ​​ boolean interrupted = false;​​
6. ​​ for (;;) {​​
7. ​​ final Node p = node.predecessor();​​
8. ​​ if (p == head) {​​
9. ​​ long r = tryAcquireShared(arg);​​
10. ​​ if (r >= 0) {​​
11. ​​ setHeadAndPropagate(node, r);​​
12. ​​ p.next = null; // help GC​​
13. ​​ if (interrupted)​​
14. ​​ selfInterrupt();​​
15. ​​ failed = false;​​
16. ​​ return;​​
17. ​​ }​​
18. ​​ }​​
19. ​​ if (shouldParkAfterFailedAcquire(p, node) &&​​
20. ​​ parkAndCheckInterrupt())​​
21. ​​ interrupted = true;​​
22. ​​ }​​
23. ​​ } finally {​​
24. ​​ if (failed)​​
25. ​​ cancelAcquire(node);​​
26. ​​ }​​
27. ​​ }​​

AQS的模板方法设计模式

模板方法模式

模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。

模板方法模式生活中的例子: 假设我们要去北京旅游,那么我们可以坐高铁或者飞机,或者火车,那么定义交通方式的抽象类,可以有以下模板:买票->安检->乘坐xx交通工具->到达北京。让子类继承该抽象类,实现对应的模板方法。

AQS解析与实战AQS定义的一些模板方法如下:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

简言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,给子类实现自定义的同步器

自定义同步器。

基于以上分析,我们都知道state,CLH队列,ConditionObject队列 等这些关键点,你要实现自定义锁的话,首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法

我们来看一下基于 AQS 实现的不可重入的独占锁的demo,来自《Java并发编程之美》:


1. ​​public class NonReentrantLock implements Lock,Serializable{​​
2.
3. ​​ //内部类,自定义同步器​​
4. ​​ static class Sync extends AbstractQueuedSynchronizer {​​
5. ​​ //是否锁已经被持有​​
6. ​​ public boolean isHeldExclusively() {​​
7. ​​ return getState() == 1;​​
8. ​​ }​​
9. ​​ //如果state为0 则尝试获取锁​​
10. ​​ public boolean tryAcquire(int arg) {​​
11. ​​ assert arg== 1 ;​​
12. ​​ //CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1​​
13. ​​ if(compareAndSetState(0, 1)){​​
14. ​​ //设置当前独占的线程​​
15. ​​ setExclusiveOwnerThread(Thread.currentThread());​​
16. ​​ return true;​​
17. ​​ }​​
18. ​​ return false;​​
19. ​​ }​​
20. ​​ //尝试释放锁,设置state为0​​
21. ​​ public boolean tryRelease(int arg) {​​
22. ​​ assert arg ==1;​​
23. ​​ //如果同步器同步器状态等于0,则抛出监视器非法状态异常​​
24. ​​ if(getState() == 0)​​
25. ​​ throw new IllegalMonitorStateException();​​
26. ​​ //设置独占锁的线程为null​​
27. ​​ setExclusiveOwnerThread(null);​​
28. ​​ //设置同步状态为0​​
29. ​​ setState(0);​​
30. ​​ return true;​​
31. ​​ }​​
32. ​​ //返回Condition,每个Condition都包含了一个Condition队列​​
33. ​​ Condition newCondition(){​​
34. ​​ return new ConditionObject();​​
35. ​​ }​​
36. ​​ }​​
37. ​​ //创建一个Sync来做具体的工作​​
38. ​​ private final Sync sync= new Sync ();​​
39.
40. ​​ @Override​​
41. ​​ public void lock() {​​
42. ​​ sync.acquire(1);​​
43. ​​ }​​
44.
45. ​​ public boolean isLocked() {​​
46. ​​ return sync.isHeldExclusively();​​
47. ​​ }​​
48. ​​ @Override​​
49. ​​ public void lockInterruptibly() throws InterruptedException {​​
50. ​​ sync.acquireInterruptibly(1);​​
51. ​​ }​​
52.
53. ​​ @Override​​
54. ​​ public boolean tryLock() {​​
55. ​​ return sync.tryAcquire(1);​​
56. ​​ }​​
57.
58. ​​ @Override​​
59. ​​ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {​​
60. ​​ return sync.tryAcquireNanos(1, unit.toNanos(time));​​
61. ​​ }​​
62.
63. ​​ @Override​​
64. ​​ public void unlock() {​​
65. ​​ sync.release(1);​​
66. ​​ }​​
67.
68.
69. ​​ @Override​​
70. ​​ public Condition newCondition() {​​
71. ​​ return sync.newCondition();​​
72. ​​ }​​
73. ​​ }​​

NonReentrantLockDemoTest:


1. ​​public class NonReentrantLockDemoTest {​​
2.
3. ​​ private static NonReentrantLock nonReentrantLock = new NonReentrantLock();​​
4.
5. ​​ public static void main(String[] args) {​​
6. ​​ for (int i = 0; i < 10; i++) {​​
7. ​​ Thread thread = new Thread(() -> {​​
8. ​​ nonReentrantLock.lock();​​
9. ​​ try {​​
10. ​​ System.out.println(Thread.currentThread().getName());​​
11. ​​ Thread.sleep(3000);​​
12. ​​ } catch (InterruptedException e) {​​
13. ​​ e.printStackTrace();​​
14. ​​ } finally {​​
15. ​​ nonReentrantLock.unlock();​​
16. ​​ }​​
17. ​​ });​​
18. ​​ thread.start();​​
19. ​​ }​​
20. ​​ }​​
21. ​​}​​

运行结果:

AQS解析与实战

AQS全家桶实战

AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下来可以看一下它们的使用案例。

ReentrantLock

ReentrantLock介绍

  • ReentrantLock为重入锁,能够对共享资源能够重复加锁,是实现Lock接口的一个类。
  • ReentrantLock支持公平锁和非公平锁两种方式

ReentrantLock案例

使用ReentrantLock来实现个简单线程安全的list,如下:


1. ​​public class ReentrantLockList {​​
2. ​​ // 线程不安全的list​​
3. ​​ private ArrayList<String> array = new ArrayList<>();​​
4. ​​ //独占锁​​
5. ​​ private volatile ReentrantLock lock = new ReentrantLock();​​
6.
7. ​​ //添加元素​​
8. ​​ public void add(String e){​​
9. ​​ lock.lock();​​
10. ​​ try {​​
11. ​​ array.add(e);​​
12. ​​ }finally {​​
13. ​​ lock.unlock();​​
14. ​​ }​​
15. ​​ }​​
16.
17. ​​ //删除元素​​
18. ​​ public void remove(String e){​​
19. ​​ lock.lock();​​
20. ​​ try {​​
21. ​​ array.remove(e);​​
22. ​​ }finally {​​
23. ​​ lock.unlock();​​
24. ​​ }​​
25. ​​ }​​
26. ​​ //获取元素​​
27. ​​ public String get(int index){​​
28. ​​ lock.lock();​​
29. ​​ try {​​
30. ​​ return array.get(index);​​
31. ​​ }finally {​​
32. ​​ lock.unlock();​​
33. ​​ }​​
34. ​​ }​​
35. ​​}​​

Semaphore

Semaphore介绍

  • Semaphore也叫信号量,可以用来控制资源并发访问的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore案例

Java多线程有一到比较经典的面试题:ABC三个线程顺序输出,循环10遍。


1. ​​public class ABCSemaphore {​​
2.
3. ​​ private static Semaphore A = new Semaphore(1);​​
4. ​​ private static Semaphore B = new Semaphore(1);​​
5. ​​ private static Semaphore C = new Semaphore(1);​​
6.
7.
8. ​​ static class ThreadA extends Thread {​​
9.
10. ​​ @Override​​
11. ​​ public void run() {​​
12. ​​ try {​​
13. ​​ for (int i = 0; i < 10; i++) {​​
14. ​​ A.acquire();​​
15. ​​ System.out.print("A");​​
16. ​​ B.release();​​
17. ​​ }​​
18. ​​ } catch (InterruptedException e) {​​
19. ​​ e.printStackTrace();​​
20. ​​ }​​
21. ​​ }​​
22.
23. ​​ }​​
24.
25. ​​ static class ThreadB extends Thread {​​
26.
27. ​​ @Override​​
28. ​​ public void run() {​​
29. ​​ try {​​
30. ​​ for (int i = 0; i < 10; i++) {​​
31. ​​ B.acquire();​​
32. ​​ System.out.print("B");​​
33. ​​ C.release();​​
34. ​​ }​​
35. ​​ } catch (InterruptedException e) {​​
36. ​​ e.printStackTrace();​​
37. ​​ }​​
38. ​​ }​​
39.
40. ​​ }​​
41.
42. ​​ static class ThreadC extends Thread {​​
43.
44. ​​ @Override​​
45. ​​ public void run() {​​
46. ​​ try {​​
47. ​​ for (int i = 0; i < 10; i++) {​​
48. ​​ C.acquire();​​
49. ​​ System.out.print("C");​​
50. ​​ A.release();​​
51. ​​ }​​
52. ​​ } catch (InterruptedException e) {​​
53. ​​ e.printStackTrace();​​
54. ​​ }​​
55. ​​ }​​
56.
57. ​​ }​​
58.
59. ​​ public static void main(String[] args) throws InterruptedException {​​
60. ​​ // 开始只有A可以获取, BC都不可以获取, 保证了A最先执行​​
61. ​​ B.acquire();​​
62. ​​ C.acquire();​​
63. ​​ new ThreadA().start();​​
64. ​​ new ThreadB().start();​​
65. ​​ new ThreadC().start();​​
66. ​​ }​​

参考

  • 《Java并发编程之美》
  • 【死磕Java并发】—–J.U.C之AQS 

个人公众号

AQS解析与实战

欢迎大家关注,大家一起学习,一起讨论。