Java Concurrent之 AbstractQueuedSynchronizer

时间:2022-09-02 20:45:37

ReentrantLock/CountDownLatch/Semaphore/FutureTask/ThreadPoolExecutor的源码中都会包含一个静态的内部类Sync,它继承了AbstractQueuedSynchronizer这个抽象类。

AbstractQueuedSynchronizer是java.util.concurrent包中的核心组件之一,为并发包中的其他synchronizers提供了一组公共的基础设施。

AQS会对进行acquire而被阻塞的线程进行管理,其管理方式是在AQS内部维护了一个FIFO的双向链表队列,队列的头部是一个空的结点,除此之外,每个结点持有着一个线程,结点中包含两个重要的属性waiteStatus和nextWaiter。结点的数据结构如下: Node中的属性waitStatus、prev、next、thread都使用了volatile修饰,这样直接的读写操作就具有内存可见性。 waitStatus表示了当前结点Node的状态

  1. static final class Node {
  2. /** waitStatus的值,表示此结点中的线程被取消 */
  3. static final int CANCELLED =  1;
  4. /** waitStatus value 表明后续结点中的线程需要unparking 唤醒 */
  5. static final int SIGNAL    = -1;
  6. /** waitStatus value 表明当前结点中的线程需要等待一个条件*/
  7. static final int CONDITION = -2;
  8. /** 表明结点是以共享模式进行等待(shared mode)的标记*/
  9. static final Node SHARED = new Node();
  10. /** 表明结点是以独占模式进行等待(exclusive mode)的标记*/
  11. static final Node EXCLUSIVE = null;
  12. /**
  13. * Status field, taking on only the values:
  14. *   SIGNAL: 后继结点现在(或即将)被阻塞(通过park) 那么当前结点在释放或者被取消的时候必须unpark它的后继结点
  15. *           为了避免竞态条件,acquire方法必须首先声明它需要一个signal,然后尝试原子的acquire
  16. *            如果失败了 就阻塞
  17. *   CANCELLED:当前结点由于超时或者中断而被取消  结点不会脱离这个状态
  18. *              尤其是,取消状态的结点中的线程永远不会被再次阻塞
  19. *   CONDITION: 当前结点在一个条件队列中。它将不会进入sync队列中直到它被transferred
  20. *              (这个值在这里的使用只是为了简化结构 跟其他字段的使用没有任何关系)
  21. *   0:          None of the above 非以上任何值
  22. *
  23. * 这些值通过数字来分类达到简化使用的效果
  24. * 非负的数字意味着结点不需要信号signal 这样大部分的代码不需要检查特定的值 just 检查符号就ok了
  25. *
  26. * 这个字段对于普通的sync结点初始化为0 对于条件结点初始化为CONDITION(-2) 本字段的值通过CAS操作进行修改
  27. */
  28. volatile int waitStatus;
  29. /**
  30. * 连接到当前结点或线程依赖的用于检查waitStatus等待状态的前驱结点。
  31. * 进入队列时赋值,出队列时置空(为GC考虑)。
  32. * 根据前驱结点的取消(CANCELLED),我们查找一个非取消结点的while循环短路,将总是会退出 ;
  33. * 因为头结点永远不会被取消:一个结点成为头结点只能通过一次成功过的acquire操作的结果
  34. * 一个取消的线程永远不会获取操作成功(acquire操作成功)
  35. * 一个线程只能取消它自己  不能是其他结点
  36. */
  37. volatile Node prev;
  38. /**
  39. * 连接到当前结点或线程释放时解除阻塞(unpark)的后继结点
  40. * 入队列时赋值,出队列时置空(为GC考虑)
  41. * 入队列时不会给前驱结点的next字段赋值,需要确认compareAndSetTail(pred, node)操作是否成功 (详见Node addWaiter(Node mode)方法)
  42. * 所以当我们发现结点的next为空时不一定就是tail尾结点 如果next为空,可以通过尾结点向前遍历即addWaiter中调用的enq(node)方法(个人觉
  43. * 这是对第一次处理失败的亡羊补牢之举)官方说法double-check 双层检查
  44. *
  45. * 被取消的结点next指向的是自己而不是空(详见cancelAcquire(Node node)中最后的node.next = node; )这让isOnSyncQueue变得简单
  46. * Upon cancellation, we cannot adjust this field, but can notice
  47. * status and bypass the node if cancelled.
  48. */
  49. volatile Node next;
  50. /**
  51. * 入队列结点中的线程,构造时初始化,使用完 就置空
  52. */
  53. volatile Thread thread;
  54. /**
  55. * 连接到下一个在条件上等待的结点 或者waitStatus为特殊值SHARED 共享模式
  56. * 因为条件队列只有在独占模式(exclusive)下持有时访问,当结点等待在条件上,我们只需要一个简单的链表队列来持有这些结点
  57. * 然后他们会转移到队列去进行re-acquire操作。
  58. * 由于条件只能是独占的,我们可以使用一个特殊的值来声明共享模式(shared mode)来节省一个字段
  59. */
  60. Node nextWaiter;
  61. /**
  62. * 如果结点以共享模式等待  就返回true
  63. */
  64. final boolean isShared() {
  65. return nextWaiter == SHARED;
  66. }
  67. /**
  68. * 返回当前结点的前驱结点如果为null就抛出NullPointException
  69. * @return the predecessor of this node
  70. */
  71. final Node predecessor() throws NullPointerException {
  72. Node p = prev;
  73. if (p == null)
  74. throw new NullPointerException();
  75. else
  76. return p;
  77. }
  78. //用于建立初始化头 或 共享标识
  79. Node() {
  80. }
  81. //入队列时使用
  82. Node(Thread thread, Node mode) {     // Used by addWaiter
  83. this.nextWaiter = mode;
  84. this.thread = thread;
  85. }
  86. //用于条件结点
  87. Node(Thread thread, int waitStatus) { // Used by Condition
  88. this.waitStatus = waitStatus;
  89. this.thread = thread;
  90. }
  91. }

acquire操作

获取同步器

  1. if(尝试获取成功){
  2. return ;
  3. }else{
  4. 加入队列;park自己
  5. }

释放同步器

  1. if(尝试释放成功){
  2. unpark等待队列中的第一个结点
  3. }else{
  4. return false;
  5. }
  1. /**
  2. * 以独占模式(exclusive mode)排他地进行的acquire操作 ,对中断不敏感 完成synchronized语义
  3. * 通过调用至少一次的tryAcquire实现 成功时返回
  4. * 否则在成功之前,一直调用tryAcquire(int)将线程加入队列,线程可能反复的阻塞和解除阻塞(park/unpark)。
  5. * 这个方法可以用于实现Lock.lock()方法
  6. * acquire是通过tryAcquire(int)来实现的,直至成功返回时结束,故我们无需自定义这个方法就可用它来实现lock。
  7. * tryLock()是通过Sync.tryAquire(1)来实现的
  8. * @param arg the acquire argument. 这个值将会被传递给tryAcquire方法
  9. * 但他是不间断的 可以表示任何你喜欢的内容
  10. */
  11. public final void acquire(int arg) {
  12. if (!tryAcquire(arg) &&
  13. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  14. selfInterrupt();
  15. }
  16. /**
  17. * 尝试以独占模式进行acquire操作 这个方法应该查询这个对象状态是否允许以独占模式进行acquire操作,如果允许就获取它
  18. *
  19. *
  20. * <p>This method is always invoked by the thread performing
  21. * acquire.  If this method reports failure, the acquire method
  22. * may queue the thread, if it is not already queued, until it is
  23. * signalled by a release from some other thread. This can be used
  24. * to implement method {@link Lock#tryLock()}.
  25. *
  26. * 默认实现抛出UnsupportedOperationException异常
  27. *
  28. * @param arg the acquire argument. This value is always the one
  29. *        passed to an acquire method, or is the value saved on entry
  30. *        to a condition wait.  The value is otherwise uninterpreted
  31. *        and can represent anything you like.
  32. * @return {@code true} if successful. Upon success, this object has
  33. *         been acquired.
  34. * @throws IllegalMonitorStateException if acquiring would place this
  35. *         synchronizer in an illegal state. This exception must be
  36. *         thrown in a consistent fashion for synchronization to work
  37. *         correctly.
  38. * @throws UnsupportedOperationException if exclusive mode is not supported
  39. */
  40. protected boolean tryAcquire(int arg) {
  41. throw new UnsupportedOperationException();
  42. }
  43. /**
  44. * 以独占不可中断模式
  45. * Acquires in exclusive uninterruptible mode for thread already in
  46. * queue. Used by condition wait methods as well as acquire.
  47. *
  48. * @param node the node
  49. * @param arg the acquire argument
  50. * @return {@code true} if interrupted while waiting
  51. */
  52. final boolean acquireQueued(final Node node, int arg) {
  53. try {
  54. boolean interrupted = false;//记录线程是否曾经被中断过
  55. for (;;) {//死循环 用于acquire获取失败重试
  56. final Node p = node.predecessor();//获取结点的前驱结点
  57. if (p == head && tryAcquire(arg)) {//若前驱为头结点  继续尝试获取
  58. setHead(node);
  59. p.next = null; // help GC
  60. return interrupted;
  61. }
  62. ////检查是否需要等待(检查前驱结点的waitStatus的值>0/<0/=0) 如果需要就park当前线程  只有前驱在等待时才进入等待 否则继续重试
  63. if (shouldParkAfterFailedAcquire(p, node) &&
  64. parkAndCheckInterrupt())//线程进入等待需要,需要其他线程唤醒这个线程以继续执行
  65. interrupted = true;//只要线程在等待过程中被中断过一次就会被记录下来
  66. }
  67. } catch (RuntimeException ex) {
  68. //acquire失败  取消acquire
  69. cancelAcquire(node);
  70. throw ex;
  71. }
  72. }
  73. /**
  74. * 检查并更新acquire获取失败的结点的状态
  75. * 信号控制的核心
  76. * Checks and updates status for a node that failed to acquire.
  77. * Returns true if thread should block. This is the main signal
  78. * control in all acquire loops.  Requires that pred == node.prev
  79. *
  80. * @param pred node's predecessor holding status
  81. * @param node the node
  82. * @return {@code true} if thread should block
  83. */
  84. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  85. int s = pred.waitStatus;
  86. if (s < 0)
  87. /*
  88. * 这个结点已经设置状态要求对他释放一个信号 所以他是安全的等待
  89. * This node has already set status asking a release
  90. * to signal it, so it can safely park
  91. */
  92. return true;
  93. if (s > 0) {
  94. /*
  95. * 前驱结点被取消 跳过前驱结点 并尝试重试 知道找到一个未取消的前驱结点
  96. * Predecessor was cancelled. Skip over predecessors and
  97. * indicate retry.
  98. */
  99. do {
  100. node.prev = pred = pred.prev;
  101. } while (pred.waitStatus > 0);
  102. pred.next = node;
  103. }
  104. else
  105. /*
  106. * 前驱结点的状态为0时表示为新建的 需要设置成SIGNAL(-1)
  107. * 声明我们需要一个信号但是暂时还不park 调用者将需要重试保证它在parking之前不被acquire
  108. * Indicate that we need a signal, but don't park yet. Caller
  109. * will need to retry to make sure it cannot acquire before
  110. * parking.
  111. */
  112. compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
  113. return false;
  114. }
  115. /**
  116. * park当前线程方便的方法 并且然后会检查当前线程是否中断
  117. *
  118. * @return {@code true} if interrupted
  119. */
  120. private final boolean parkAndCheckInterrupt() {
  121. LockSupport.park(this);
  122. return Thread.interrupted();
  123. }

添加结点到等待队列

首先构建一个准备入队列的结点,如果当前队列不为空,则将mode的前驱指向tail(只是指定当前结点的前驱结点,这样下面的操作一即使失败了 也不会影响整个队列的现有连接关系),compareAndSetTail成功将mode设置为tail结点,则将原先的tail结点的后继节点指向mode。如果队列为空亦或者compareAndSetTail操作失败,没关系我们还有enq(node)为我们把关。

  1. /**
  2. *通过给定的线程和模式 创建结点和结点入队列操作
  3. *
  4. * @param current the thread 当前线程
  5. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 独占和共享模式
  6. * @return the new node
  7. */
  8. private Node addWaiter(Node mode) {
  9. Node node = new Node(Thread.currentThread(), mode);
  10. // Try the fast path of enq; backup to full enq on failure
  11. Node pred = tail;
  12. if (pred != null) {
  13. node.prev = pred;//只是指定当前结点的前驱结点,这样下面的操作一即使失败了   也不会影响整个队列的现有连接关系
  14. if (compareAndSetTail(pred, node)) {//原子地设置node为tail结点 CAS操作 操作一
  15. pred.next = node;
  16. return node;
  17. }
  18. }
  19. enq(node);//操作一失败时  这里会重复检查亡羊补牢一下  官方说法 double-check
  20. return node;
  21. }
  22. /**
  23. * 将结点插入队列 必要时进行初始化操作
  24. * @param node 带插入结点
  25. * @return node's predecessor 返回当前结点的前驱结点
  26. */
  27. private Node enq(final Node node) {
  28. for (;;) {
  29. Node t = tail;
  30. if (t == null) { // Must initialize 当前队列为空 进行初始化操作
  31. Node h = new Node(); // Dummy header 傀儡头结点
  32. h.next = node;
  33. node.prev = h;
  34. if (compareAndSetHead(h)) {//原子地设置头结点
  35. tail = node;//头尾同一结点
  36. return h;
  37. }
  38. }
  39. else {
  40. node.prev = t;
  41. if (compareAndSetTail(t, node)) {//原子地设置tail结点 上面操作一的增强操作
  42. t.next = node;
  43. return t;
  44. }
  45. }
  46. }
  47. }

acquire 取消结点

取消结点操作:首先会判断结点是否为null,若不为空,while循环查找距离当前结点最近的非取消前驱结点PN(方便GC处理取消的结点),然后取出这个前驱的后继结点指向,利用它来感知其他的取消或信号操作(例如 compareAndSetNext(pred, predNext, null)) 然后将当前结点的状态Status设置为CANCELLED

  • 当前结点如果是尾结点,就删除当前结点,将找到的非取消前驱结点PN设置为tail,并原子地将其后继指向为null

  • 当前结点存在后继结点SN,如果前驱结点需要signal,则将PN的后继指向SN;否则将通过unparkSuccessor(node);唤醒后继结点

  1. /**
  2. * 取消一个将要尝试acquire的结点
  3. *
  4. * @param node the node
  5. */
  6. private void cancelAcquire(Node node) {
  7. // 如果结点不存在就直接返回
  8. if (node == null)
  9. return;
  10. node.thread = null;
  11. // 跳过取消的结点 while循环直到找到一个未取消的结点
  12. Node pred = node.prev;
  13. while (pred.waitStatus > 0)
  14. node.prev = pred = pred.prev;
  15. //前面的操作导致前驱结点发送变化 但是pred的后继结点还是没有变化
  16. Node predNext = pred.next;//通过predNext来感知其他的取消或信号操作 例如 compareAndSetNext(pred, predNext, null)
  17. //这里用无条件的写来代替CAS操作
  18. node.waitStatus = Node.CANCELLED;
  19. // 如果当前node是tail结点 就删除当前结点
  20. if (node == tail && compareAndSetTail(node, pred)) {
  21. compareAndSetNext(pred, predNext, null);//原子地将node结点之前的第一个非取消结点设置为tail结点 并将其后继指向null
  22. } else {
  23. // 如果前驱不是头结点 并且前驱的状态为SIGNAL(或前驱需要signal)
  24. if (pred != head
  25. && (pred.waitStatus == Node.SIGNAL
  26. || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
  27. && pred.thread != null) {
  28. //如果node存在后继结点 将node的前驱结点的后继指向node的后继
  29. Node next = node.next;
  30. if (next != null && next.waitStatus <= 0)
  31. compareAndSetNext(pred, predNext, next);//原子地将pred的后继指向node的后继
  32. } else {
  33. //node没有需要signal的前驱,通知后继结点
  34. unparkSuccessor(node);
  35. }
  36. node.next = node; // help GC
  37. }
  38. }}

唤醒后继结点 unparkSuccessor

唤醒后继结点操作:首先会尝试清除当前结点的预期信号,这里即使操作失败亦或是信号已经被其他等待线程改变 都不影响
然后查找当前线程最近的一个非取消结点 并唤醒它
  1. /**
  2. * 如果存在后继结点 就唤醒它
  3. *
  4. * @param node the node
  5. */
  6. private void unparkSuccessor(Node node) {
  7. /*
  8. * 尝试清除预期信号 如果操作失败或该状态被其他等待线程改变 也没关系
  9. */
  10. compareAndSetWaitStatus(node, Node.SIGNAL, 0);
  11. /*
  12. * 准备unpark的线程在后继结点里持有(通常就是下一个结点)
  13. * 但如果被取消或为空  那么就从tail向后开始遍历查找实际的非取消后继结点
  14. */
  15. Node s = node.next;
  16. if (s == null || s.waitStatus > 0) {
  17. s = null;
  18. for (Node t = tail; t != null && t != node; t = t.prev)
  19. if (t.waitStatus <= 0)
  20. s = t;//找到一个后并不跳出for循环 为了找到一个距离node最近的非取消后继结点
  21. }
  22. if (s != null)//结点不为空 唤醒后继的等待线程
  23. LockSupport.unpark(s.thread);
  24. }

 回过头来总结一下:

当我们调用acquire(int)时,会首先通过tryAcquire尝试获取锁,一般都是留给子类实现(例如ReetrantLock$FairSync中的实现)

  1. /**
  2. * tryAcquire的公平版本
  3. * Fair version of tryAcquire.  Don't grant access unless
  4. * recursive call or no waiters or is first.
  5. */
  6. protected final boolean tryAcquire(int acquires) {
  7. final Thread current = Thread.currentThread();
  8. int c = getState();
  9. if (c == 0) {
  10. if (isFirst(current) &&
  11. compareAndSetState(0, acquires)) {
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. else if (current == getExclusiveOwnerThread()) {
  17. int nextc = c + acquires;
  18. if (nextc < 0)
  19. throw new Error("Maximum lock count exceeded");
  20. setState(nextc);
  21. return true;
  22. }
  23. return false;
  24. }

如果tryAcquire(int)返回为false,则说明没有获得到锁。 则!tryAcquire(int)为true,接着会继续调用acquireQueued(final Node node ,int arg)方法,当然这调用这个方法之前,我们需要将当前包装成Node加入到队列中(即调用addWaiter(Node mode))。

在acquireQueued()方法体中,我们会发现一个死循环,唯一跳出死循环的途径是 直到找到一个(条件1)node的前驱是傀儡head结点并且子类的tryAcquire()返回true,那么就将当前结点设置为head结点并返回结点对于线程的中断状态。如果(条件1)不成立,则执行shouldParkAfterFailuredAcquire()

在shouldParkAfterFailuredAcquire(Node pred,Node node)方法体中,

首先会判断node结点的前驱结点pred的waitStatus的值:

* 如果waitStatus>0,表明pred处于取消状态(CANCELLED)则从队列中移除pred。

* 如果waitStatus<0,表明线程需要park住

* 如果waitStatus=0,表明这是一个新建结点,需要设置成SIGNAL(-1),在下一次循环中如果不能获得锁就需要park住线程,parkAndCheckInterrupt()就是执行了park()方法来park线程并返回线程中断状态。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

如果中间抛出RuntimeException异常,则会调用cancelAcquire(Node)方法取消获取。取消其实也很简单,首先判断node是否为空,如果不为空,找到node最近的非取消前驱结点PN,并将node的status设置为CANCELLED;

* 倘若node为tail,将node移除并将PN结点设置为tail PN的后继指向null

* 倘若node存在后继结点SN,如果前驱结点PN需要signal,则将PN后继指向SN 否则调用unparkSuccessor(Node)唤醒后继SN

AcquireShared共享锁

  1. /**
  2. * 以共享模式获取Acquire 对中断不敏感
  3. * 通过多次调用tryAcquireShared方法来实现 成功时返回
  4. * 否则线程加入Sync队列 可能重复进行阻塞和释放阻塞 调用tryAcquireShared知道成功
  5. *
  6. * @param arg the acquire argument.  This value is conveyed to
  7. *        {@link #tryAcquireShared} but is otherwise uninterpreted
  8. *        and can represent anything you like.
  9. */
  10. public final void acquireShared(int arg) {
  11. if (tryAcquireShared(arg) < 0)
  12. doAcquireShared(arg);
  13. }
  14. /**
  15. * 以共享不可中断模式获取Acquire
  16. * Acquires in shared uninterruptible mode.
  17. * @param arg the acquire argument
  18. */
  19. private void doAcquireShared(int arg) {
  20. final Node node = addWaiter(Node.SHARED);
  21. try {
  22. boolean interrupted = false;
  23. for (;;) {
  24. final Node p = node.predecessor();
  25. if (p == head) {
  26. int r = tryAcquireShared(arg);
  27. if (r >= 0) {
  28. setHeadAndPropagate(node, r);
  29. p.next = null; // help GC
  30. if (interrupted)
  31. selfInterrupt();
  32. return;
  33. }
  34. }
  35. if (shouldParkAfterFailedAcquire(p, node) &&
  36. parkAndCheckInterrupt())
  37. interrupted = true;
  38. }
  39. } catch (RuntimeException ex) {
  40. cancelAcquire(node);
  41. throw ex;
  42. }
  43. }
  44. /**
  45. * Sets head of queue, and checks if successor may be waiting
  46. * in shared mode, if so propagating if propagate > 0.
  47. *
  48. * @param pred the node holding waitStatus for node
  49. * @param node the node
  50. * @param propagate the return value from a tryAcquireShared
  51. */
  52. private void setHeadAndPropagate(Node node, int propagate) {
  53. setHead(node);//队列向后移一位
  54. if (propagate > 0 && node.waitStatus != 0) {//propagate>0表明共享数值大于前面要求的数值
  55. /*
  56. * Don't bother fully figuring out successor.  If it
  57. * looks null, call unparkSuccessor anyway to be safe.
  58. */
  59. Node s = node.next;
  60. if (s == null || s.isShared())//如果剩下只有一个node或者node.next是共享的 需要park住该线程
  61. unparkSuccessor(node);
  62. }
  63. }

条件Condition

Condition是服务单个Lock,condition.await()等方法在Lock上形成一个condition等待队列

condition.signal()方法在Lock上面处理condition等待队列然后将队列中的node加入到AQS的阻塞队列中等待对应的线程被unpark

  1. /**
  2. * 实现可中断的条件等待
  3. * <ol>
  4. * <li> If current thread is interrupted, throw InterruptedException
  5. * <li> Save lock state returned by {@link #getState}
  6. * <li> Invoke {@link #release} with
  7. *      saved state as argument, throwing
  8. *      IllegalMonitorStateException  if it fails.
  9. * <li> Block until signalled or interrupted
  10. * <li> Reacquire by invoking specialized version of
  11. *      {@link #acquire} with saved state as argument.
  12. * <li> If interrupted while blocked in step 4, throw exception
  13. * </ol>
  14. */
  15. public final void await() throws InterruptedException {
  16. if (Thread.interrupted())
  17. throw new InterruptedException();
  18. Node node = addConditionWaiter();//加入到condition的对用lock的私有队列中,与AQS阻塞队列形成相似
  19. //释放这个condition对应的lock的锁 因为若这个await方法阻塞住而lock没有释放锁
  20. //那么对于其他线程的node来说肯定是阻塞住的
  21. //因为condition对应的lock获得了锁,肯定在AQS的header处,其他线程肯定是得不到锁阻塞在那里,这样两边都阻塞的话就死锁了
  22. //故这里需要释放对应的lock锁
  23. int savedState = fullyRelease(node);
  24. int interruptMode = 0;
  25. while (!isOnSyncQueue(node)) {//判断condition是否已经转化成AQS阻塞队列中的一个结点 如果没有park这个线程
  26. LockSupport.park(this);
  27. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  28. break;
  29. }
  30. //这一步需要signal()或signalAll()方法的执行 说明这个线程已经被unpark 然后运行直到acquireQueued尝试再次获得锁
  31. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  32. interruptMode = REINTERRUPT;
  33. if (node.nextWaiter != null)
  34. unlinkCancelledWaiters();
  35. if (interruptMode != 0)
  36. reportInterruptAfterWait(interruptMode);
  37. }

网上找到的一个帮助理解Condition的gif图

Java Concurrent之 AbstractQueuedSynchronizer

这个AQS存在两中链表

* 一种链表是AQS sync链表队列,可称为 横向链表

* 一种链表是Condition的wait Node链表,相对于AQS sync是结点的一个纵向链表

当纵向链表被signal通知后 会进入对应的Sync进行排队处理

  1. /**
  2. * Moves the longest-waiting thread, if one exists, from the
  3. * wait queue for this condition to the wait queue for the
  4. * owning lock.
  5. *
  6. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  7. *         returns {@code false}
  8. */
  9. public final void signal() {
  10. if (!isHeldExclusively())
  11. throw new IllegalMonitorStateException();
  12. Node first = firstWaiter;
  13. if (first != null)
  14. doSignal(first);
  15. }
  16. /**
  17. * Removes and transfers nodes until hit non-cancelled one or
  18. * null. Split out from signal in part to encourage compilers
  19. * to inline the case of no waiters.
  20. * @param first (non-null) the first node on condition queue
  21. */
  22. private void doSignal(Node first) {
  23. do {
  24. if ( (firstWaiter = first.nextWaiter) == null)//将旧的头结点移出 让下一个结点顶替上来
  25. lastWaiter = null;
  26. first.nextWaiter = null;
  27. } while (!transferForSignal(first) &&//将旧的头结点加入到AQS的等待队列中
  28. (first = firstWaiter) != null);
  29. }
  30. /**
  31. * Transfers a node from a condition queue onto sync queue.
  32. * Returns true if successful.
  33. * @param node the node
  34. * @return true if successfully transferred (else the node was
  35. * cancelled before signal).
  36. */
  37. final boolean transferForSignal(Node node) {
  38. /*
  39. * If cannot change waitStatus, the node has been cancelled.
  40. */
  41. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  42. return false;
  43. /*
  44. * Splice onto queue and try to set waitStatus of predecessor to
  45. * indicate that thread is (probably) waiting. If cancelled or
  46. * attempt to set waitStatus fails, wake up to resync (in which
  47. * case the waitStatus can be transiently and harmlessly wrong).
  48. */
  49. Node p = enq(node);//进入AQS的阻塞队列
  50. int c = p.waitStatus;
  51. //该结点点的状态CANCELLED或者修改状态失败 就直接唤醒该结点内的线程
  52. //PS 正常情况下 这里是不会为true的故不会在这里唤醒该线程
  53. //只有发送signal信号的线程 调用了reentrantLock.unlock方法后(该线程已经加入到了AQS等待队列)才会被唤醒。
  54. if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
  55. LockSupport.unpark(node.thread);
  56. return true;
  57. }

转眼之间,2014已经与我渐行渐远  2015要开启源码研究之旅、fighting

Java Concurrent之 AbstractQueuedSynchronizer的更多相关文章

  1. &lbrack;Java Concurrent&rsqb; 多线程合作 producer-consumers &sol; queue 的简单案例

    在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景. Toast 被生产和消费的对象. ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 To ...

  2. Java并发编程-AbstractQueuedSynchronizer源码分析

    简介 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础.使用的方法是继承,子类通过 ...

  3. Java并发基础类AbstractQueuedSynchronizer的实现原理简介

    1.引子 Lock接口的主要实现类ReentrantLock 内部主要是利用一个Sync类型的成员变量sync来委托Lock锁接口的实现,而Sync继承于AbstractQueuedSynchroni ...

  4. Java并发:AbstractQueuedSynchronizer(AQS)

    队列同步器 AbstractQueuedSynchronizer 是一个公共抽象类.提供一个同步器框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等).使用一个 in ...

  5. java concurrent包的学习(转)

    java concurrent包的学习(转) http://my.oschina.net/adwangxiao/blog/110188 我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常 ...

  6. How to Create a Java Concurrent Program

    In this Document   Goal   Solution   Overview   Steps in writing Java Concurrent Program   Template ...

  7. Java Concurrent Topics

    To prevent Memory Consistency Errors(MCEs), it is good practice to specify synchronized class specif ...

  8. java AQS(AbstractQueuedSynchronizer)同步器详解

    除了内置锁(synchronized)外,java AQS(AbstractQueuedSynchronizer)同步器几乎是所有同步容器,同步工具类的基础.ReentrantLock.Reentra ...

  9. Java中编写线程安全代码的原理&lpar;Java concurrent in practice的快速要点&rpar;

    Java concurrent in practice是一本好书,不过太繁冗.本文主要简述第一部分的内容. 多线程 优势 与单线程相比,可以利用多核的能力; 可以方便的建模成一个线程处理一种任务; 与 ...

随机推荐

  1. WAP端 穿透问题和解决方法

    1. 穿透问题可这么理解, 共有2种问题: 问题1: 有A 和 B 两个弹层,B 弹层盖在A 弹层上面,B 弹层绑定 touchend 事件,当用户点击B 的时候 B隐藏,由于touchend 事件触 ...

  2. Linux命令(1)- grep

    1.grep 功能:查找文件里符合条件的字符串. 语法:grep [-abcEFGhHilLnqrsvVwxy][-A<显示列数>][-B<显示列数>][-C<显示列数& ...

  3. 世界上不存在什么RedBSD&comma;SuseBSD或者ArchBSD,Turb&period;&period;&period;

    世界上不存在什么RedBSD,SuseBSD或者ArchBSD,TurboBSD之类的东西.

  4. nginx 的安装

    一.必要软件准备1.安装pcre 为了支持rewrite功能,我们需要安装pcre 复制代码代码如下: # yum install pcre* //如过你已经装了,请跳过这一步 2.安装openssl ...

  5. 关于delete和delete&lbrack;&rsqb;

    [精彩] 求问delete和delete[] 的区别??http://www.chinaunix.net/jh/23/311058.html C++告诉我们在回收用 new 分配的单个对象的内存空间的 ...

  6. gulp基于seaJs模块化项目打包实践【原创】

    公司还一直在延续使用jq+seajs的技术栈,所以只能基于现在的技术栈进行静态文件打包,而众所周知seajs的打包比较"偏门",在查了不少的文档和技术分享后终于琢磨出了自己的打包策 ...

  7. java中字符串&OpenCurlyDoubleQuote;不可变性”的破坏,使用反射破坏final属性。以及涉及到字符串常量池的问题。

    大家都清楚java中String类是不可变的,它的定义中包含final关键字.一旦被创建,值就不能被改变(引用是可以改变的). 但这种“不可变性”不是完全可靠的,可以通过反射机制破坏.参考一下代码: ...

  8. TCHAR和CHAR类型的互转&comma;string 转lpcwstr

    https://www.cnblogs.com/yuguangyuan/p/5955959.html 没有定义UNICODE,所以它里面的字符串就是简单用" "就行了,创建工程的时 ...

  9. SSE图像算法优化系列八:自然饱和度&lpar;Vibrance&rpar;算法的模拟实现及其SSE优化(附源码,可作为SSE图像入门,Vibrance算法也可用于简单的肤色调整)。

    Vibrance这个单词搜索翻译一般振动,抖动或者是响亮.活力,但是官方的词汇里还从来未出现过自然饱和度这个词,也不知道当时的Adobe中文翻译人员怎么会这样处理.但是我们看看PS对这个功能的解释: ...

  10. WPF DataGrid多表头&sol;列头,多行头,合并单元格,一列占据多行

    先上效果图: 思路说明:这是两个DataGrid,没有嵌套,位置和高度保持一致,在加上ScrollViewer滚动条,这就像是在一个DataGrid中. 缺点: 因为最外层有透明的Border,所以没 ...