前言
前段时间在面试,发现面试官都有问到同步器AQS的相关问题。AQS为Java中几乎所有的锁和同步器提供一个基础框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的几个核心点,谈谈对AbstractQueuedSynchronizer的理解,并实现一个自定义同步器。
AQS原理面试题的核心回答要点
- state 状态的维护。
- CLH队列
- ConditionObject通知
- 模板方法设计模式
- 独占与共享模式。
- 自定义同步器。
- AQS全家桶的一些延伸,如:ReentrantLock等。
AQS的类图结构
AQS全称是AbstractQueuedSynchronizer,即抽象同步队列。下面看一下AQS的类图结构:
为了方便下面几个关键点的理解,大家先熟悉一下AQS的类图结构。
state 状态的维护
1. 在AQS中维持了一个单一的共享状态state,来实现同步器同步。看一下state的相关代码如下:
state源码
1. /**
2. * The synchronization state.
3. */
4. private volatile int state;
5.
6. /**
7. * Returns the current value of synchronization state.
8. * This operation has memory semantics of a {@code volatile} read.
9. * @return current state value
10. */
11. protected final int getState() {
12. return state;
13. }
14.
15. /**
16. * Sets the value of synchronization state.
17. * This operation has memory semantics of a {@code volatile} write.
18. * @param newState the new state value
19. */
20. protected final void setState(int newState) {
21. state = newState;
22. }
23.
24. /**
25. * Atomically sets synchronization state to the given updated
26. * value if the current state value equals the expected value.
27. * This operation has memory semantics of a {@code volatile} read
28. * and write.
29. *
30. * @param expect the expected value
31. * @param update the new value
32. * @return {@code true} if successful. False return indicates that the actual
33. * value was not equal to the expected value.
34. */
35. protected final boolean compareAndSetState(int expect, int update) {
36. // See below for intrinsics setup to support this
37. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
38. }
state 源码设计几个回答要点:
- state用volatile修饰,保证多线程中的可见性。
- getState()和setState()方法采用final修饰,限制AQS的子类重写它们两。
- compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。
CLH队列
谈到CLH队列,我们结合以上state状态,先来看一下AQS原理图:
CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
Node节点
CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),condition队列的后续节点(nextWaiter)如下图:
waitStatus几种状态状态:
我们再看一下CLH队列入列以及出列的代码:
入列
CLH队列入列就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下:
1. //构造Node
2. private Node addWaiter(Node mode) {
3. Node node = new Node(Thread.currentThread(), mode);
4. // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
5. Node pred = tail;
6. if (pred != null) {
7. node.prev = pred;
8. //CAS设置尾节点
9. if (compareAndSetTail(pred, node)) {
10. pred.next = node;
11. return node;
12. }
13. }
14. //多次尝试
15. enq(node);
16. return node;
17. }
由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点,enq方法如下:
1. private Node enq(final Node node) {
2. //死循环尝试,知道成功为止
3. for (;;) {
4. Node t = tail;
5. //tail 不存在,设置为首节点
6. if (t == null) { // Must initialize
7. if (compareAndSetHead(new Node()))
8. tail = head;
9. } else {
10. node.prev = t;
11. if (compareAndSetTail(t, node)) {
12. t.next = node;
13. return t;
14. }
15. }
16. }
17. }
出列
首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。可以看一下以下两段源码:
1. Node h = head;
2. if (h != null && h.waitStatus != 0)
3. unparkSuccessor(h);
4. private void unparkSuccessor(Node node) {
5. /*
6. * If status is negative (i.e., possibly needing signal) try
7. * to clear in anticipation of signalling. It is OK if this
8. * fails or if status is changed by waiting thread.
9. */
10. int ws = node.waitStatus;
11. if (ws < 0)
12. compareAndSetWaitStatus(node, ws, 0);
13.
14. /*
15. * Thread to unpark is held in successor, which is normally
16. * just the next node. But if cancelled or apparently null,
17. * traverse backwards from tail to find the actual
18. * non-cancelled successor.
19. */
20. Node s = node.next;
21. if (s == null || s.waitStatus > 0) {
22. s = null;
23. for (Node t = tail; t != null && t != node; t = t.prev)
24. if (t.waitStatus <= 0)
25. s = t;
26. }
27. if (s != null)
28. LockSupport.unpark(s.thread);
29. }
CLH核心几个回答要点
- 双向链表入列出列
- CAS算法设置尾节点+死循环自旋。
CAS算法,可以看一下我工作实战中仿造CAS算法解决并发问题的实现
ConditionObject
ConditionObject简介
我们都知道,synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持 。
Condition队列与CLH队列的那些事
我们先来看一下图:
ConditionObject队列与CLH队列的爱恨情仇:
- 调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点。
- 线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒。
- 当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒。
区别:
- ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。
独占与共享模式。
AQS支持两种同步模式:独占式和共享式。
独占式
同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。
公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。
非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。
acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:
1. public final void acquire(long arg) {
2. if (!tryAcquire(arg) &&
3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4. selfInterrupt();
5. }
6. //构造Node
7. private Node addWaiter(Node mode) {
8. Node node = new Node(Thread.currentThread(), mode);
9. // Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
10. Node pred = tail;
11. if (pred != null) {
12. node.prev = pred;
13. //CAS设置尾节点
14. if (compareAndSetTail(pred, node)) {
15. pred.next = node;
16. return node;
17. }
18. }
19. //多次尝试
20. enq(node);
21. return node;
22. }
- acquireQueued(final Node node, long arg)方法
23. final boolean acquireQueued(final Node node, long arg) {
24. boolean failed = true;
25. try {
26. boolean interrupted = false;
27. for (;;) {
28. final Node p = node.predecessor();
29. if (p == head && tryAcquire(arg)) {
30. setHead(node);
31. p.next = null; // help GC
32. failed = false;
33. return interrupted;
34. }
35. if (shouldParkAfterFailedAcquire(p, node) &&
36. parkAndCheckInterrupt())
37. interrupted = true;
38. }
39. } finally {
40. if (failed)
41. cancelAcquire(node);
42. }
43. }
44. static void selfInterrupt() {
45. Thread.currentThread().interrupt();
46. }
结合源代码,可得acquire(int arg)方法流程图,如下:
共享式
多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。
acquireShared(long arg)是共享式获取同步状态的方法,可以看一下源码:
1. public final void acquireShared(long arg) {
2. if (tryAcquireShared(arg) < 0)
3. doAcquireShared(arg);
4. }
由上可得,先调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败,调用doAcquireShared(int arg)自旋方式获取同步状态,方法源码如下:
1. private void doAcquireShared(long arg) {
2. final Node node = addWaiter(Node.SHARED);
3. boolean failed = true;
4. try {
5. boolean interrupted = false;
6. for (;;) {
7. final Node p = node.predecessor();
8. if (p == head) {
9. long r = tryAcquireShared(arg);
10. if (r >= 0) {
11. setHeadAndPropagate(node, r);
12. p.next = null; // help GC
13. if (interrupted)
14. selfInterrupt();
15. failed = false;
16. return;
17. }
18. }
19. if (shouldParkAfterFailedAcquire(p, node) &&
20. parkAndCheckInterrupt())
21. interrupted = true;
22. }
23. } finally {
24. if (failed)
25. cancelAcquire(node);
26. }
27. }
AQS的模板方法设计模式
模板方法模式
模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。
模板方法模式生活中的例子: 假设我们要去北京旅游,那么我们可以坐高铁或者飞机,或者火车,那么定义交通方式的抽象类,可以有以下模板:买票->安检->乘坐xx交通工具->到达北京。让子类继承该抽象类,实现对应的模板方法。
AQS定义的一些模板方法如下:
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
简言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,给子类实现自定义的同步器。
自定义同步器。
基于以上分析,我们都知道state,CLH队列,ConditionObject队列 等这些关键点,你要实现自定义锁的话,首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法。
我们来看一下基于 AQS 实现的不可重入的独占锁的demo,来自《Java并发编程之美》:
1. public class NonReentrantLock implements Lock,Serializable{
2.
3. //内部类,自定义同步器
4. static class Sync extends AbstractQueuedSynchronizer {
5. //是否锁已经被持有
6. public boolean isHeldExclusively() {
7. return getState() == 1;
8. }
9. //如果state为0 则尝试获取锁
10. public boolean tryAcquire(int arg) {
11. assert arg== 1 ;
12. //CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1
13. if(compareAndSetState(0, 1)){
14. //设置当前独占的线程
15. setExclusiveOwnerThread(Thread.currentThread());
16. return true;
17. }
18. return false;
19. }
20. //尝试释放锁,设置state为0
21. public boolean tryRelease(int arg) {
22. assert arg ==1;
23. //如果同步器同步器状态等于0,则抛出监视器非法状态异常
24. if(getState() == 0)
25. throw new IllegalMonitorStateException();
26. //设置独占锁的线程为null
27. setExclusiveOwnerThread(null);
28. //设置同步状态为0
29. setState(0);
30. return true;
31. }
32. //返回Condition,每个Condition都包含了一个Condition队列
33. Condition newCondition(){
34. return new ConditionObject();
35. }
36. }
37. //创建一个Sync来做具体的工作
38. private final Sync sync= new Sync ();
39.
40. @Override
41. public void lock() {
42. sync.acquire(1);
43. }
44.
45. public boolean isLocked() {
46. return sync.isHeldExclusively();
47. }
48. @Override
49. public void lockInterruptibly() throws InterruptedException {
50. sync.acquireInterruptibly(1);
51. }
52.
53. @Override
54. public boolean tryLock() {
55. return sync.tryAcquire(1);
56. }
57.
58. @Override
59. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
60. return sync.tryAcquireNanos(1, unit.toNanos(time));
61. }
62.
63. @Override
64. public void unlock() {
65. sync.release(1);
66. }
67.
68.
69. @Override
70. public Condition newCondition() {
71. return sync.newCondition();
72. }
73. }
NonReentrantLockDemoTest:
1. public class NonReentrantLockDemoTest {
2.
3. private static NonReentrantLock nonReentrantLock = new NonReentrantLock();
4.
5. public static void main(String[] args) {
6. for (int i = 0; i < 10; i++) {
7. Thread thread = new Thread(() -> {
8. nonReentrantLock.lock();
9. try {
10. System.out.println(Thread.currentThread().getName());
11. Thread.sleep(3000);
12. } catch (InterruptedException e) {
13. e.printStackTrace();
14. } finally {
15. nonReentrantLock.unlock();
16. }
17. });
18. thread.start();
19. }
20. }
21. }
运行结果:
AQS全家桶实战
AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下来可以看一下它们的使用案例。
ReentrantLock
ReentrantLock介绍
- ReentrantLock为重入锁,能够对共享资源能够重复加锁,是实现Lock接口的一个类。
- ReentrantLock支持公平锁和非公平锁两种方式
ReentrantLock案例
使用ReentrantLock来实现个简单线程安全的list,如下:
1. public class ReentrantLockList {
2. // 线程不安全的list
3. private ArrayList<String> array = new ArrayList<>();
4. //独占锁
5. private volatile ReentrantLock lock = new ReentrantLock();
6.
7. //添加元素
8. public void add(String e){
9. lock.lock();
10. try {
11. array.add(e);
12. }finally {
13. lock.unlock();
14. }
15. }
16.
17. //删除元素
18. public void remove(String e){
19. lock.lock();
20. try {
21. array.remove(e);
22. }finally {
23. lock.unlock();
24. }
25. }
26. //获取元素
27. public String get(int index){
28. lock.lock();
29. try {
30. return array.get(index);
31. }finally {
32. lock.unlock();
33. }
34. }
35. }
Semaphore
Semaphore介绍
- Semaphore也叫信号量,可以用来控制资源并发访问的线程数量,通过协调各个线程,以保证合理的使用资源。
Semaphore案例
Java多线程有一到比较经典的面试题:ABC三个线程顺序输出,循环10遍。
1. public class ABCSemaphore {
2.
3. private static Semaphore A = new Semaphore(1);
4. private static Semaphore B = new Semaphore(1);
5. private static Semaphore C = new Semaphore(1);
6.
7.
8. static class ThreadA extends Thread {
9.
10. @Override
11. public void run() {
12. try {
13. for (int i = 0; i < 10; i++) {
14. A.acquire();
15. System.out.print("A");
16. B.release();
17. }
18. } catch (InterruptedException e) {
19. e.printStackTrace();
20. }
21. }
22.
23. }
24.
25. static class ThreadB extends Thread {
26.
27. @Override
28. public void run() {
29. try {
30. for (int i = 0; i < 10; i++) {
31. B.acquire();
32. System.out.print("B");
33. C.release();
34. }
35. } catch (InterruptedException e) {
36. e.printStackTrace();
37. }
38. }
39.
40. }
41.
42. static class ThreadC extends Thread {
43.
44. @Override
45. public void run() {
46. try {
47. for (int i = 0; i < 10; i++) {
48. C.acquire();
49. System.out.print("C");
50. A.release();
51. }
52. } catch (InterruptedException e) {
53. e.printStackTrace();
54. }
55. }
56.
57. }
58.
59. public static void main(String[] args) throws InterruptedException {
60. // 开始只有A可以获取, BC都不可以获取, 保证了A最先执行
61. B.acquire();
62. C.acquire();
63. new ThreadA().start();
64. new ThreadB().start();
65. new ThreadC().start();
66. }
参考
- 《Java并发编程之美》
- 【死磕Java并发】—–J.U.C之AQS
个人公众号
欢迎大家关注,大家一起学习,一起讨论。