ReentrantLock之AbstractQueuedSynchronizer 源码分析笔记

时间:2021-07-21 18:37:16

目前java有两种方式实现线程同步一种是synchronized方式 ,一种是基于AQS框架的方式

以下对两种方式涉及到的类做对比:

  synchronized 方式 AQS框架
同步方式 synchronized  同步块 ReentrantLock或ReentrantReadAndWriteLock的lock()、tryLock()、lockInterruptibly()和unlock()方法
释放锁等待 Object.wait() Condition.await()
唤醒等待线程 Object.nobify(),Object.notifyAll() Condition.signal(),Condition.signalAll()

除此之外AQS框架还有一个CountDownLatch类,用于线程的同步;


AQS框架都是基于父类AbstractQueuedSynchronizer实现的,它的实现类包括ReentrantLock(独占锁),ReentrantReadAndWriteLock(独占锁与共享锁),Condition(看上面表的对比就知道作用类似于Object的wait和notify),还有CountDownLatch类,以及java的线程池ThreadPoolExecuter的内部类Worker;

下面讲讲父类AbstractOwnableSynchronizer这个类

java.util.concurrent.locks.AbstractQueuedSynchronizer有四个重要的成员变量

 

一、private transient Thread exclusiveOwnerThread;

当前持有独占锁的线程对象,有且只能有一个,其它线程只能进入到等待队列

二、private volatile int state;

state代表加锁状态,不同的实现类state的作用不一样:

1.对于ReentrantLock是代表独占锁数量,无锁时state=0,有锁时state>0, 第一次加锁时,将state设置为1,持有锁的线程,可以多次加锁,只有持有锁的线程才可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread()),即可加锁,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁,解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉了,其它线程又可以加锁了(所以加了几次锁就要解锁几次);当持有锁的线程释放锁以后,如果是公平锁,则等待队列会获取到加锁权限,在等待队列头部取出第一个线程去获取锁,获取锁的线程会被移出队列,如果是非公平锁,获取到加锁权限的有可能是等待队列中的第一个线程,也有可能是一个新加入的竞争锁的线程;

2.对于ReentrantReadAndWriteLock,state为4字节的整形,它的高位两个字节用来存储读锁数量(共享锁),低位两个字节用来存储写锁数量(独占锁),获取共享锁前要先判断有没有独占锁,如果存在读占锁,并且持有独占锁的线程不是当前线程,则获取锁失败;

3.对于CountDownLatch,state为初始化时传入的CountDown的次数,当state为0时,解除阻塞;

4.对于ThreadPoolExecuter的内部类Worker,state用来标识当前线程是否允许中断,Worker类就是线程池内运行的线程,它实现了Runable接口,继承了AbstractOwnableSynchronizer,state有3个值,-1代表Worker线程刚被实例化,还没运行,0代表线程已经运行,处于无锁状态,1代表线程已经加锁;线程的锁用于线程池在RUNNING和SHUTDOWN状态,Worker线程如果在执行任务,不允许被中断;

三、private transient volatile Node head;

等待获取锁的线程队列头节点


四、private transient volatile Node tail;

等待获取锁的线程的尾节点

 

 如果state>0,其它需要加锁的线程需要等待持有锁的线程释放锁,等待获取锁的线程会放入到一个先进先出的链表队列,headtail就是等待获取锁的队列的头节点和尾节点,除了头节点外,每个节点都与一个线程绑定(即在创建节点时,设置Node.thread = Thread.currentThread),等待获取锁的线程会通过调用Unsafe.park()方法阻塞等待;

持有锁的线程释放锁时会将state值为0,然后调用Unsafe.unpark()方法取消等待队列中头部的第一个线程的阻塞状态,去获取锁。

等待队列中的线程如果被park阻塞,这时其它线程中断了它,park就会解除阻塞,马上返回(这不是我们需要的,我们需要的是park方法一直阻塞,直到持有锁的线程释放了锁调用了unpark方法取消阻塞的返回),处于中断状态的线程 ,再调用park方法是无法阻塞的,所以这时必须要将线程状态重置为非中断状态,然后再次调用park方法阻塞等待,在线程获取到锁后再调用线程interrupt方法,恢复线程的中断状态;


几个重要的方法:


    //尝试获取独占锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }


    //尝试释放独占锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }


    //尝试获取共享锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }


    //尝试释放共享锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }


上面这4个方法我们会发现都会抛出不支持的操作的异常,一般会在它的子类中重新实现这几个方法,原理都是通过调用UNSAFE类的CAS操作,来对state做加1操作(加锁)或减1操作(释放锁),因为是CAS操作,所以有可能失败;


//加锁操作

public final void acquire(int arg) {

//如果尝试加锁失败,则调用AddWaiter方法将当前线程封装为Node对象,放入到等待加锁的队列的队尾,Node.EXCLUSIVE代表独占锁
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}


如果加锁失败,则将线程封装为一个Node对象放入到等待队列;

等待队列是由Node对象组成的一个双向链表,除了链表的头节点,每个Node都持有一个线程,以下是node的数据结构(Node是AQS的内部类);

static final class Node {

/**下面两个常量代表节点的类型**/


        //常量:代表Node节点是一个要获取共享锁的节点
        static final Node SHARED = new Node();
       //常量:代表Node节点是一个要获取独占锁节点
        static final Node EXCLUSIVE = null;


       /**下面4个常量代表Node节点的等待状态**/

       //代表该线程已经出现了异常,取消等待,这样的节点会被直接移出等待队列,放弃锁的争用;
        static final int CANCELLED =  1;
        /**代表该节点的线程在等待着持有锁的线程释放锁后,执行UNPARK操作,从队列头开始取消第一个阻塞节点的状态**/
        static final int SIGNAL    = -1;
        // 代表该节点处于Condition.await()状态 
        static final int CONDITION = -2;
      
        static final int PROPAGATE = -3;

        //节点的等待状态,值为上面4个常量的值,初始值为0,如果是头节点则也为0
        volatile int waitStatus;


       //该节点的前一个节点
        volatile Node prev;


      //该节点的后一个节点
        volatile Node next;


       //当前节点封装的等待获取锁的线程,如果该节点是链表的头节点,则Thread为null
        volatile Thread thread;


      //共享锁的下一个节点
        Node nextWaiter;


        //判断次节点是否是等待共享锁
        final boolean isShared() {
            return nextWaiter == SHARED;
        }


        //获取本节点的前一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }


        Node() {    // Used to establish initial head or SHARED marker
        }

        //此节点进入等待获取锁的阻塞队列
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        //此节点会进入Condition.await()队列
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }



以下是入队过程:

 //将当前线程封装为Node对象,放入等待队列队尾

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;

        //如果队列不为空,则尝试将node放入到队列尾部
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;//如果入队成功,返回Node
            }
        }

       //如果没有入队,则采用自旋锁的方式,将node加进队列;
        enq(node);
        return node;
    }


//自旋锁,将node加入到队列尾部

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;

           //如果t为null,说明队列为空,且还没有初始化,需要要初始化队列
            if (t == null) {

 //使用cas方式尝试设置头结点为一个新的Node节点;
                if (compareAndSetHead(new Node()))
                    tail = head; //如果设置头结点成功,则将尾节点指向头节点;

               //如果设置失败,说明有其它需要入队等待的线程已经初始化了队列;则继续循环
            } else {//如果t不是null 说明队列不为空
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


未完待续;