深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码

时间:2021-02-02 18:35:40

深入并发包之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码

这篇文章是一篇并发相关的源码分析文章,主要从源码级别分析AQS操作(主要关于阻塞锁的实现),从而加深对并发中ReentrantLockReentrantReadWriteLock两种锁的理解。

AQS概述

下面引用JDK文档中对AQS类的描述:

为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。

在这篇文章中我们只关注有关锁的实现的相关部分,那么这段话就可以这样理解。

AQS类的内部实现了一个FIFO的等待队列(实际是一个链表),用来保存线程等相关内容,同时,利用这个队列来实现一个阻塞锁。

同时AQS中有一个int类型的值state,这个值有相关的原子操作,我们可以使用相关的方法来修改这个值。

当state的值等于0时,我们认为没有线程获得了锁;当state的值大于0时,我们认为已经有线程获得了锁,同时,线程每重入一次,state的值加一,这样就可以实现锁的重入

AQS中我们有独占共享两种模式,因此我们既可以实现独占锁,也可以实现共享锁。

队列的实现

AQS的队列的实现主要有一个内部类Node,来表示队列的结点。

static final class Node {
    //结点的状态,signal等
    volatile int waitStatus;
    //结点的前驱结点
    volatile Node prev;
    //结点的后继结点
    volatile Node next;
    //结点保存的线程
    volatile Thread thread;
    //表示当前结点的模式,独占还是共享
    Node nextWaiter;
}

由此我们可以看出,所谓的FIFO队列实际上是一个双向链表,结点中的主要数据就是状态和线程。我们会将所有获取锁但是没有成功的线程保存到队列中,这样我们就可以在此基础上实现其他功能,例如公平锁和非公平锁等内容。

下面给出结点状态的可能取值:

//由于超时或者中断,线程处于取消状态
static final int CANCELLED =  1;
//线程处于阻塞状态
static final int SIGNAL    = -1;
//线程再等待满足条件
static final int CONDITION = -2;
//线程状态需要向后传播
static final int PROPAGATE = -3;

结点的默认状态为0,这种状态表示结点的状态不属于上面的任意一种。当结点状态大于0时,意味着线程不需要被阻塞,因此,很多时候JDK在校验的时候没有校验到具体值,而是一种范围校验。

既然实现了队列,那么自然需要维护队列中的结点,后面我们会介绍相关方法。

ReentrantLock加锁方法的概述

首先我们先来描述一下ReentrantLock使用AQS实现加锁需要实现的功能。

1、 将所有需要加锁的线程维护到队列中

2、 将所有需要等待的线程阻塞

3、 将所有被取消的线程从队列中删除

4、 将所有被中断的线程中断

5、 将获得锁的线程从队列中删除

AQS的使用

使用AQS来实现一个锁,我们需要在类的内部声明一个非公共的类来继承AQS类,并且实现AQS类中的部分方法。

一般来说,我们需要自己重写方法如下

//独占获取锁
tryAcquire(int)
//独占模式释放锁
tryRelease(int)
//共享模式获取锁
tryAcquireShared(int)
//共享模式释放锁
tryReleaseShared(int)

在AQS中,上面几个方法体是直接抛出异常,所以如果我们没有自己重写,那么加锁方法是无法调用的。

查看ReentrantLock类的源码,其内部有一个类Sync类。,继承了AQS类,并且重写了部分方法。由于ReentrantLockReentrantLock实现了公平锁和非公平锁,所以,其类的内部又有两个类NonfairSyncFairSync类继承了Sync类。

加锁代码思路

上面的介绍应该让大家已经对AQS的使用和基本功能有所了解了,接下来,我们将跟随ReentrantLock源码的思路,一步步分析加锁和解锁的流程。

博主会把JDK中的源码主要内容粘贴到文章中,好了,开始吧。

下面这段是ReentrantLock中的加锁部分

public void lock() {
    sync.lock();
}

可以看出加锁部分实际是调用了Sync类的lock()方法。
当我们查看lock()方法时发现,这个方法是一个抽象方法,实际上,在公平锁和非公平锁中,这个方法的实现是不同,因此,这个方法是由NonfairSyncFairSync类实现的,下面以非公平锁中的实现为例。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

加锁过程概览见下图

深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码

加锁过程中,首先通过一个CAS操作,如果state的值为0(即没有线程获得了独占锁),那么将其设置为1(表示已经有一个线程获得了独占锁),如果设置成功,那么加锁就成功了,然后调用方法setExclusiveOwnerThread,将获得锁的线程设置为当前线程。

由于是非公平锁,所以不需要考虑线程的等待时间等因素。

同时需要注意,如果CAS操作失败,也就是说已经有线程已经获得了独占锁,这时我们调用acquire函数获取锁,这里应该是考虑到锁重入等情况。同时如果是第一个线程进入,就可以直接获得锁,效率较高。

下面给出acquire方法,这个方法实际上是由AQS给出的实现。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这里可以看出acquire方法会调用我们自己实现的tryAcquire方法来尝试获得锁,如果获取锁成功,那么方法就直接结束了,如果获取锁失败会继续执行后面的方法。

tryAcquire方法

那么,接下来查看非公平锁中的tryAcquire方法。

注意,tryAcquire方法是由ReentrantLock类实现的

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

查看源码,nonfairTryAcquire方法是在Sync类中给出的。

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //没有线程获得过锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /**
             * 已经有线程获取了锁,由于是独占锁,所以其他线程不可以
             *再次进入,但是有一种特殊情况,就是当前线程或获得锁的
             *线程是同一个,这时满足可重入
             */
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

这段代码的整体流程如下图:

深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码

这段代码的思路如下:

  • 如果state的值是0,那么证明独占锁没有被占有,我们可尝试CAS操作,如果CAS操作成功,那么当前线程获得了锁,返回为真。
  • 如果state的值不为0,那么说明已经有线程获得了锁,其他线程就不应该可以获得锁,但是,这里有一种情况,就是重入,如果已经占有锁的线程和当前线程是同一线程,那么就可以获得锁,并且需要将state的值加一,表示重入了一次,这里还对state的值进行了校验,以防溢出,这里每个线程的重入次数实际上是有限的。

addWaiter以及acquireQueued方法

这里让我们的目光会到acquire方法中,假如获取锁失败,那么接下来我们需要做的就是维护FIFO链表,并且让所有没有获得锁的线程进入等待队列,直到获得锁。

为了方便,这里将acquire方法再次给出

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

下面介绍方法addWaiteracquireQueued,这里Node.EXCLUSIVE实际上就是null,这里是用来区分节点是等待独占锁还是共享锁的。

给出addWaiter的代码,这部分代码有AQS实现

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//新建节点
        Node pred = tail;
        //如果队列有值那么可以直接将当前节点加入到队列尾部,注意这里如果多个结点同时进入是有可能发生CAS操作失败的情况,所以接下来就需要enq方法
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //调用enq初始化队列并将结点加入队列中
        enq(node);
        return node;
    }

这部分代码的主要职责是维护链表的顺序,生成一个节点,将节点维护到链表的尾部,如果链表为空或者CAS设置tail失败那么就需要enq方法来初始化链表,并将当前节点维护到链表的尾部。

    private Node enq(final Node node) {
        //死循环,节点会自旋,直到成功加入队列,如果有并发进入这个方法,也不会有问题
        for (;;) {
            Node t = tail;
            if (t == null) { // 如果队列为空,需要初始化一个空的Node节点,并让head和tail都指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//将节点放入队列的尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

给出acquireQueued的代码,这部分代码由AQS实现

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果前驱结点为头结点,那么只有当前结点在等待获取锁,那么就可以再次尝试获取锁,如果获取成功就可以反悔了
                if (p == head && tryAcquire(arg)) {
                    //获取锁成功后释放结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断当前结点是否被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这部分代码实际也是一个死循环,当有一个结点进入后会一直自旋,知道成功获得锁。如果在等待的过程中,线程被中断,则不会响应,但是会标记一下,当该结点的线程获得锁后,再对线程进行中断处理。

通过两幅图片介绍一下acquireQueued方法的两种情况

情况一,队列中只有头结点和当前结点

深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码

情况二,当前结点不是队列中除头结点外的唯一结点

深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码

下面我们详细介绍两个方法,自旋的两个重要函数shouldParkAfterFailedAcquireparkAndCheckInterrupt

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前驱结点的状态为SIGNAL,那么当前结点可以park即让其等待
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {//前驱结点的状态为cancle
            //把所有状态为cancle的结点全部跳过,即从队列中删除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //将前驱结点的状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        //上面函数返回值为真,才能进入这个函数,表示当前结点可以被park,并且再检查线程是否被中断,并清除中断状态,暂时不做响应
        LockSupport.park(this);
        return Thread.interrupted();
    }

目前进行到这里加锁还剩最后一步,即对线程中断的处理,如果线程被中断,并且获得了锁,那么会调用方法selfInterrupt来进行中断处理。

    static void selfInterrupt() {
        //再次调用线程中段方法
        Thread.currentThread().interrupt();
    }

释放锁代码思路

经过上面的折磨,终于走到了释放锁部分的代码,这部分代码相对于加锁部分要简单很多。

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        //尝试释放锁,释放成功那么需要将其他等待中的结点唤醒
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //如果调用释放锁的线程和占有锁的线程不是同一个,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //由于锁的可重入,只有当state的值为0的时候才能真正释放锁
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //修改State的值
            setState(c);
            return free;
        }

释放锁部分的代码实际上很简单,由于只有获得锁得线程才能够释放锁,因此,如果调用释放锁的线程和占有锁的线程不是同一个,则需要抛出异常。另外一个需要注意的点就是,由于锁的可重入性,我们使用了state的值来表示锁的重入次数,因此,只有当state的值变为0,我们才能够真正的释放锁,但是,无论是否释放了锁,我们都需要将state的值修改。

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }