队列同步器AbstractQueueSynchronizer

时间:2021-08-13 17:10:29
    同步器是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
    同步器主要使用的方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。子类推荐被定义为自定义同步组件的静态内部类,同步器本身没有实现任何同步接口,仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用。
    同步器的设计是基于模版方法模式的,即使用者需要继承同步器并重写指定的方法。同步器支持独占式获取同步状态和共享式获取同步状态

    Java.concurrent包的实现示意图:
   
队列同步器AbstractQueueSynchronizer

1 队列同步器的接口
同步器的设计是基于模板的。使用者需要重写同步器指定的方法,然后将同步器组合在自定义同步组件的视线中,并调用同步器提供的模板方法。而这些模板方法就是调用同步器使用者重写的方法。

1.1 三个核心方法
- getState():获取当前同步状态
- setState(int  newState):设置当前同步状态
- compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能保证状态设置的原子性。

1.2 可重写的方法



  • tryAcquire(int arg) :独占式获取同步状态,该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
  • tryRelease(int arg) :独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
  • tryAcquireShared(int  arg) :共享式获取同步状态,返回大于等于0的值,表示获取成功,否则失败
  • tryReleaseShared(int arg): 共享式释放同步状态
  • isHeldExclusively() :当前同步器是否在独占模式下被线程占用,一般该方法表示是否被前当线程多独占

1.3 模板方法


  • acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg) 方法。
  • acquireInterruptibly(int arg) 与acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前被中断,则该方法会抛出InterruptedException并返回。
  • tryAcquireNanos(int arg,long nanos)在acquireInterruptibly基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,获取到了返回true。
  • release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
  • acquireShared(int arg) 共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待。与独占式的不同是同一时刻可以有多个线程获取到同步状态。
  • acquireSharedInterruptibly(int arg) 与acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前被中断,则该方法会抛出InterruptedException并返回。
  • tryAcquireSharedNanos(int arg,long nanos)在acquireSharedInterruptibly基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,获取到了返回true
  • releaseShared(int arg) 共享式释放同步状态
  • getQueuedThreads() 获取等待在同步队列上的线程集合
同步器提供的模版方法大致分为3类:
                             独占式获取与释放同步状态,共享式获取与释放同步状态,查询同步队列中的等待线程情况

2 同步队列

队列同步器的实现依赖内部的同步队列来完成同步状态的管理。它是一个FIFO的双向队列,当线程获取同步状态失败时,同步器会将当前线程和等待状态等信息包装成一个节点并将其加入同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

下面是Node静态内部类的源码
[java]  view plain  copy
  1. static final class Node {  
  2.         /** 标识一个这个节点是否为shared类型 */  
  3.         static final Node SHARED = new Node();  
  4.         /** Marker to indicate a node is waiting in exclusive mode */  
  5.         static final Node EXCLUSIVE = null;  
  6.   
  7.         /** waitStatus value to indicate thread has cancelled */  
  8.         static final int CANCELLED =  1;  
  9.         /** waitStatus value to indicate successor's thread needs unparking */  
  10.         static final int SIGNAL    = -1;  
  11.         /** waitStatus value to indicate thread is waiting on condition */  
  12.         static final int CONDITION = -2;  
  13.         /** 
  14.          * waitStatus value to indicate the next acquireShared should 
  15.          * unconditionally propagate 
  16.          */  
  17.         static final int PROPAGATE = -3;  
  18.   
  19.         /** 
  20.          *   等待状态值,又以下状态值: 
  21.          * 
  22.          *   SIGNAL:     值为-1 ,后续节点处于等待状态,而当前节点的线程如果 
  23.          *               释放了同步状态或者取消等待,节点进入该状态不会变化           
  24.          *   CANCELLED:  值为 1,由于在同步队列中等待的线程等待超时或者被中断 
  25.          *               需要从同步队列中取消等待,节点进入该状态将不会变化                     
  26.          *   CONDITION:  值为-2,节点在等待队列中,节点线程等待在Condition上, 
  27.          *               当其他线程对Condition调用了signal()方法后,该节点将会 
  28.          *               从等待队里中转移到同步队列中,加入对同步状态的获取中 
  29.          
  30.          *   PROPAGATE:  值为-3,表示下一次共享式同步状态获取将会无条件地被传播下去 
  31.          *             
  32.          *   0:          初始化状态 
  33.          */  
  34.         volatile int waitStatus;  
  35.   
  36.         /** 
  37.          * 前驱节点,当节点加入同步队列时被设置 
  38.          */  
  39.         volatile Node prev;  
  40.   
  41.          /** 
  42.          * 后继节点 
  43.          */  
  44.         volatile Node next;  
  45.   
  46.         /** 
  47.          * 获取状态状态的线程 
  48.          */  
  49.         volatile Thread thread;  
  50.   
  51.          /** 
  52.          * 等待队列中的后继节点。如果当前节点是共享的,那么这个字段是一个shared常量, 
  53.          * 也就是说节点类型(独占或共享)和等待队列中个后继节点共用同一个字段 
  54.          */  
  55.         Node nextWaiter;  
  56.   
  57.         /** 
  58.          * Returns true if node is waiting in shared mode. 
  59.          */  
  60.         final boolean isShared() {  
  61.             return nextWaiter == SHARED;  
  62.         }  
  63.   
  64.         /** 
  65.          * Returns previous node, or throws NullPointerException if null. 
  66.          * Use when predecessor cannot be null.  The null check could 
  67.          * be elided, but is present to help the VM. 
  68.          * 
  69.          * @return the predecessor of this node 
  70.          */  
  71.         final Node predecessor() throws NullPointerException {  
  72.             Node p = prev;  
  73.             if (p == null)  
  74.                 throw new NullPointerException();  
  75.             else  
  76.                 return p;  
  77.         }  
  78.   
  79.         Node() {    // Used to establish initial head or SHARED marker  
  80.         }  
  81.   
  82.         Node(Thread thread, Node mode) {     // Used by addWaiter  
  83.             this.nextWaiter = mode;  
  84.             this.thread = thread;  
  85.         }  
  86.   
  87.         Node(Thread thread, int waitStatus) { // Used by Condition  
  88.             this.waitStatus = waitStatus;  
  89.             this.thread = thread;  
  90.         }  
  91.     }  

节点是构成同步队列的基础,同步器拥有首节点和尾节点,没有成功获取同步状态的线程会成为节点加入该队列的尾部,其结构如下图所示
队列同步器AbstractQueueSynchronizer
同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。
如果一个线程没有获得同步队列,那么包装它的节点将被加入到队尾,显然这个过程应该是线程安全的。因此同步器提供了一个基于CAS的设置尾节点的方法: compareAndSetTail(Node expect,Node update),它需要传递一个它认为的尾节点和当前节点,只有设置成功,当前节点才被加入队尾。这个过程如下所示
队列同步器AbstractQueueSynchronizer
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,这一过程如下:

队列同步器AbstractQueueSynchronizer
上图设置首节点是通过获取同步状态成功的线程来实完成的,由于只有一个线程能够成功获取到同步状态,因此设置头结点的方法不需要使用CAS保证。
     

3 独占式同步状态的获取与释放


3.1 同步状态的获取


在前面的部分已经提到,独占式获取同步状态的方法是acquried(int arg),该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列移除,其源代码如下:
[java]  view plain  copy
  1. public final void acquire(int arg) {  
  2.         if (!tryAcquire(arg) &&  
  3.             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
  4.             selfInterrupt();  
  5.     }  

这里面主要完成的工作是同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等操作,其主要逻辑是:
(1)调用自定义同步器的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态
(2)如果获取失败,就构造一个独占式(Node.EXCLUSIVE)的同步节点,并通过addWaiter方法加入到同步节点的尾部
(3)最后调用acquiredQueued方法,是的该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程中断来实现。

来看一下相关代码
(1) addWaiter

[java]  view plain  copy
  1. private Node addWaiter(Node mode) {  
  2. 当前线程构造一个节点  
  3.     Node node = new Node(Thread.currentThread(), mode);  
  4. 尾部快速插入  
  5.     Node pred = tail;  
  6.     if (pred != null) {  
  7.         node.prev = pred;  
  8.         if (compareAndSetTail(pred, node)) {  
  9.             pred.next = node;  
  10.             return node;  
  11.         }  
  12.     }  
  13.     enq(node);  
  14.     return node;  
  15. }  

 (2)enq

[java]  view plain  copy
  1. private Node enq(final Node node) {  
  2.  //死循环  
  3.        for (;;) {  
  4.            Node t = tail;  
  5.            if (t == null) { // 如果尾节点为空 则进行初始化  
  6.                if (compareAndSetHead(new Node()))  
  7.                    tail = head;  
  8.            } else {  
  9.                node.prev = t;  
  10.     //利用CAS设置尾节点  
  11.                if (compareAndSetTail(t, node)) {  
  12.                    t.next = node;  
  13.                    return t;  
  14.                }  
  15.            }  
  16.        }  
  17.    }  

上述两个方法是在保证线程安全的情况下,利用死循环不断地尝试设置尾节点。
addWaiter()方法中能够线程安全的添加尾节点,这样可以避免当多个线程tryAcruire(int arg)方法获取同步状态失败而并发被添加到队列时的顺序混乱。
enq()方法中,同步器通过“死循环”保证节点的正确添加。只有通过CAS将节点设置为尾节点后,线程才能从该方法中返回
那么节点进入同步队列以后,就要进入一个等待阶段。 这是一个自旋的过程,每个节点都在不停地观察,看看有没有机会获取同步状态。如果获取到同步状态,就可以从自旋过程中退出

(3) acquireQueued方法

[java]  view plain  copy
  1. final boolean acquireQueued(final Node node, int arg) {  
  2.         boolean failed = true;  
  3.         try {  
  4.             boolean interrupted = false;  
  5.             for (;;) {  
  6.                 final Node p = node.predecessor();  
  7.         //只有前驱节点是头节点才能尝试获取同步状态  
  8.                 if (p == head && tryAcquire(arg)) {  
  9.                     setHead(node);  
  10.                     p.next = null// help GC  
  11.                     failed = false;  
  12.                     return interrupted;  
  13.                 }  
  14.                 if (shouldParkAfterFailedAcquire(p, node) &&  
  15.                     parkAndCheckInterrupt())  
  16.                     interrupted = true;  
  17.             }  
  18.         } finally {  
  19.             if (failed)  
  20.                 cancelAcquire(node);  
  21.         }  
  22.     }  

线程在死循环中尝试获取同步状态,并且只有前驱节点为头节点的时候才会获取,因为
1,头节点是获取了同步状态的节点,之后它释放了同步状态才会唤醒后继节点。
2,维护同步队列的FIFO原则
下图描述了节点自旋获取同步状态的情况

在上图中,由于非首节点线程前驱节点出队或者被中断而从等待返回,随后检查自己的前驱是否不是首节点,如果是则尝试获取同步状态。可以到节点间并没有通讯,只是在不断地检查自己的前驱是否为头节点。





3.2 同步状态的释放


队里通过调用同步器的release的方法进行同步状态的释放,该方法释放了同步状态后,就会唤醒其后继节点。其源代码如下:

[java]  view plain  copy
  1. public final boolean release(int arg) {  
  2.        if (tryRelease(arg)) {  
  3.            Node h = head;  
  4.            if (h != null && h.waitStatus != 0)  
  5.                unparkSuccessor(h);  
  6.            return true;  
  7.        }  
  8.        return false;  
  9.    }  


该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor通过使用LockSupport在唤醒处于等待状态的线程。

3.3 总结

在获取同步状态时,同步器维护这一个同步队列,并持有对头节点和尾节点的引用。获取状态失败的线程会被包装成节点加入到尾节点后面称为新的尾节点,在进入同步队列后开始自旋,停止自旋的条件就是前驱节点为头节点并且成功获取到同步状态。在释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点

4 共享式同步状态获取与释放


共享式获取与独占式获取的区别就是同一时刻是否可以多个线程同时获取到同步状态。以文件的读写来说,读操作的话同一时刻可以有很多线程在进行并阻塞写操作,但是写操作只能有一个线程在写并阻塞所有读操作。

4.1 同步状态获取


通过调用同步器的acquireShare(int arg) 方法可以共享式地获取同步状态。

[java]  view plain  copy
  1. public final void acquireShared(int arg) {  
  2.     if (tryAcquireShared(arg) < 0)  
  3.         doAcquireShared(arg);  
  4. }  
  5.   
  6.   
  7.  private void doAcquireShared(int arg) {  
  8.     final Node node = addWaiter(Node.SHARED);  
  9.     boolean failed = true;  
  10.     try {  
  11.         boolean interrupted = false;  
  12.         for (;;) {  
  13.             final Node p = node.predecessor();  
  14.             if (p == head) {  
  15.                 int r = tryAcquireShared(arg);  
  16.                 if (r >= 0) {  
  17.                     setHeadAndPropagate(node, r);  
  18.                     p.next = null// help GC  
  19.                     if (interrupted)  
  20.                         selfInterrupt();  
  21.                     failed = false;  
  22.                     return;  
  23.                 }  
  24.             }  
  25.             if (shouldParkAfterFailedAcquire(p, node) &&  
  26.                 parkAndCheckInterrupt())  
  27.                 interrupted = true;  
  28.         }  
  29.     } finally {  
  30.         if (failed)  
  31.             cancelAcquire(node);  
  32.     }  
  33. }  

在这个方法中,同步器调用tryAcquireShared方法尝试获取同步状态, tryAcquireShared返回值是一个int类型,当返回值大于0时,表示能够获取到同步状态。因此同步队列里的节点结束自旋状态的条件就是tryAcquireShared返回值大于0。

4.2 同步状态释放
[java]  view plain  copy
  1. public final boolean releaseShared(int arg) {  
  2.        if (tryReleaseShared(arg)) {  
  3.            doReleaseShared();  
  4.            return true;  
  5.        }  
  6.        return false;  
  7.    }  

[java]  view plain  copy
  1. private void doReleaseShared() {  
  2.         /* 
  3.          * Ensure that a release propagates, even if there are other 
  4.          * in-progress acquires/releases.  This proceeds in the usual 
  5.          * way of trying to unparkSuccessor of head if it needs 
  6.          * signal. But if it does not, status is set to PROPAGATE to 
  7.          * ensure that upon release, propagation continues. 
  8.          * Additionally, we must loop in case a new node is added 
  9.          * while we are doing this. Also, unlike other uses of 
  10.          * unparkSuccessor, we need to know if CAS to reset status 
  11.          * fails, if so rechecking. 
  12.          */  
  13.         for (;;) {  
  14.             Node h = head;  
  15.             if (h != null && h != tail) {  
  16.                 int ws = h.waitStatus;  
  17.                 if (ws == Node.SIGNAL) {  
  18.                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
  19.                         continue;            // loop to recheck cases  
  20.                     unparkSuccessor(h);  
  21.                 }  
  22.                 else if (ws == 0 &&  
  23.                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
  24.                     continue;                // loop on failed CAS  
  25.             }  
  26.             if (h == head)                   // loop if head changed  
  27.                 break;  
  28.         }  
  29.     }  

由于这是共享式地,因此释放同步状态时可能有多个线程在进行释放的操作。因此这里面使用了CAS来保证线程安全。

5,独占式超时获取同步状态
     Java5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。
    在Java5中,同步器提供了acquireInterruotibly(int arg)方法,这个方法在等待获取同步状态时,如果线程被中断,会立刻返回,并抛出InterruptedExceptedException


文中的引用博文:http://blog.csdn.net/u010723709/article/details/50357247