Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析

时间:2023-03-08 17:14:05
Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析

我们知道在解决线程安全问题上使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,同时只有一个线程可以获取该锁,而实际情况下会有写少读多的场景,显然 ReentrantLock 满足不了需求,所以 ReentrantReadWriteLock 应运而生,ReentrantReadWriteLock 采用读写分离,多个线程可以同时获取读锁。

首先我们先看一下,ReentrantReadWriteLock 内部构造先看下它的类图结构如下图所示:

Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析

如上图可以看到读写锁内部维护了一个ReadLock和WriteLock,并且也提供了公平和非公平的实现,下面只介绍下非公平的读写锁的实现,我们知道AQS里面维护了一个state状态,

而ReentrantReadWriteLock 则需要维护读状态和写状态,一个state是无法表示写和读状态的。ReentrantReadWriteLock 巧妙的使用 state 的高 16 位表示读状态,

也就是获取改读锁的线程个数,低 16 位 表示获取到写锁的线程的可重入次数。并通过CAS对其进行操作实现了读写分离,在读多写少的场景下比较适用。

接下来用一张图来加深对 ReentrantReadWriteLock 的理解:

Java并发编程笔记之读写锁 ReentrantReadWriteLock 源码分析

首先我们先看ReentrantReadWriteLock 的内部类Sync的一些关键属性和方法,源码如下:

static final int SHARED_SHIFT   = ;

//共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = ( << SHARED_SHIFT);
//共享锁线程最大个数65535
static final int MAX_COUNT = ( << SHARED_SHIFT) - ; //排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = ( << SHARED_SHIFT) - ;
//用来记录最后一个获取读锁的线程获取读锁的可重入次数
private transient HoldCounter cachedHoldCounter;
//用来记录第一个获取到读锁的线程
private transient Thread firstReader;
//用来记录第一个获取到读锁的线程获取读锁的可重入次数
private transient int firstReadHoldCount;
//用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数
private transient ThreadLocalHoldCounter readHolds = new ThreadLocalHoldCounter();
/** 返回读锁线程数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

类图中 firstReader用来记录第一个获取到读锁的线程,firstReadHoldCount则记录第一个获取到读锁的线程获取读锁的可重入数。cachedHoldCounter用来记录最后一个获取读锁的线程获取读锁的可重入次数。

接下我们进入ReentrantReadWriteLock 的内部类Sync的内部类HoldCounter类的源码,如下:

static final class HoldCounter {
int count = ;
//线程id
final long tid = getThreadId(Thread.currentThread());
}

readHolds 是ThreadLocal 变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数,可知ThreadLocalHoldCounter继承了ThreadLocal,里面initialValue方法返回一个HoldCounter对象,源码如下:

  static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

接下来进行写锁的获取与释放讲解,如下:

ReentrantReadWriteLock 中写锁是使用的 WriteLock 来实现的。我们先看一下写锁WriteLock的获取与释放方法,如下:

  1.void lock() 写锁是个独占锁,同时只有一个线程可以获取该锁。 如果当前没有线程获取到读锁和写锁则当前线程可以获取到写锁然后返回。 如果当前已经有线程取到读锁和写锁则当前线程则当前请求写锁线程会被阻塞挂起。

另外写锁是可重入锁,如果当前线程已经获取了该锁,再次获取的只是简单的把可重入次数加一后直接返回。源码如下:

  public void lock() {
sync.acquire();
}
   public final void acquire(int arg) {
// sync重写的tryAcquire方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

如上代码,lock()内部调用了AQS的acquire方法,其中的tryAcquire是ReentrantReadWriteLock 内部 sync 类重写的,代码如下:

  protected final boolean tryAcquire(int acquires) {

            Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//(1) c!=0说明读锁或者写锁已经被某线程获取
if (c != ) {
()//w=0说明已经有线程获取了读锁或者w!=0并且当前线程不是写锁拥有者,则返回false
if (w == || current != getExclusiveOwnerThread())
return false;
()//说明某线程获取了写锁,判断可重入个数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded"); ()// 设置可重入数量(1)
setState(c + acquires);
return true;
} ()//第一个写线程获取写锁
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

如上代码(1),如果当AQS状态值不为0 则说明当前已经有线程获取到了读锁或者写锁,代码(2)如果w == 0 说明状态值的低 16 为0,而状态值不为0,则说明高16位不为0,这暗示已经有线程获取了读锁,所以直接返回false。

如果w != 0 说明当前已经有线程获取了该写锁,则看当前线程是不是该锁的持有者,如果不是则返回false。

执行到代码(3)说明当前线程之前获取到了该锁,则判断该线程的可重入此时是不是超过了最大值,是则抛异常,否则执行代码(4)增加当前线程的可重入次数,然后返回true。

如果AQS的状态值等于0,则说明目前没有线程获取到读锁和写锁,则实行代码(5),

其中对于ReentrantReadWriteLock的子类NofairSync的writerShouldBlock方法的非公平锁的实现源码如下:

final boolean writerShouldBlock() {
return false; // writers can always barge
}

如代码对于非公平锁来说固定返回false,则说明代码(5)抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程返回true,否则返回false。

对于对于ReentrantReadWriteLock的子类FairSync的writerShouldBlock方法的公平锁的实现源码如下:

final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}

可知还是使用 hasQueuedPredecessors 来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限直接返回 false。

  2.void lockInterruptibly() 类似 lock() 方法,不同在于该方法对中断响应,也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException,源码如下:

    public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly();
}

  3.boolean tryLock() 尝试获取写锁,如果当前没有其它线程持有写锁或者读锁,则当前线程获取写锁会成功,然后返回 true。 如果当前已经其它线程持有写锁或者读锁则该方法直接返回 false,当前线程并不会被阻塞。

如果当前线程已经持有了该写锁则简单增加 AQS 的状态值后直接返回 true。源码如下:

  public boolean tryLock( ) {
return sync.tryWriteLock();
}
  final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != ) {
int w = exclusiveCount(c);
if (w == || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + ))
return false;
setExclusiveOwnerThread(current);
return true;
}

如上代码与tryAcquire 方法类似,这里不再讲述,不同在于这里使用的非公平策略

  4.boolean tryLock(long timeout, TimeUnit unit) 与 tryAcquire()不同在于多了超时时间的参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回 false。

另外该方法对中断响应, 也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException。源码如下:

public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
return sync.tryAcquireNanos(, unit.toNanos(timeout));
}

  5.void unlock() 尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的 AQS 状态值减一,如果减去 1 后当前状态值为 0 则当前线程会释放对该锁的持有,否者仅仅减一而已。

如果当前线程没有持有该锁调用了该方法则会抛出 IllegalMonitorStateException 异常 ,源码如下:

public void unlock() {
sync.release();
}
  public final boolean release(int arg) {
//调用ReentrantReadWriteLock中sync实现的tryRelease方法
if (tryRelease(arg)) {
//激活阻塞队列里面的一个线程
Node h = head;
if (h != null && h.waitStatus != )
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//(6) 看是否是写锁拥有者调用的unlock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//(7)获取可重入值,这里没有考虑高16位,因为写锁时候读锁状态值肯定为0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == ;
//(8)如果写锁可重入值为0则释放锁,否者只是简单更新状态值。
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

如上代码 tryRelease 首先通过 isHeldExcusively判断是否当前线程是该写锁的持有者,如果不是则抛异常,否则执行代码(7)说明当前线程持有写锁,持有写锁说明状态值的高16位为0,所以这里nextc值就是当前线程写锁的剩余可重入次数。

代码(8)判断当前可重入次数是否为0,如果free为true 说明可重入次数为0,则当前线程会释放对写锁的持有,当前锁的持有者设置为null。如果free 为false,则简单更新可重入次数。

  

前面讲解了写锁的获取与释放,接下来讲解读锁的获取与释放,如下:

ReentrantReadWriteLock 中写锁是使用的 ReadLock 来实现的。主要看ReadLock读锁的获取与释放的主要方法,如下:

  1.void lock() 获取读锁,如果当前没有其它线程持有写锁,则当前线程可以获取读锁,AQS 的高 16 位的值会增加 1,然后方法返回。否者如果其它有一个线程持有写锁,则当前线程会被阻塞。源码如下:

public void lock() {
sync.acquireShared();
}
  public final void acquireShared(int arg) {
//调用ReentrantReadWriteLock中的sync的tryAcquireShared方法
if (tryAcquireShared(arg) < )
//调用AQS的doAcquireShared方法
doAcquireShared(arg);
}

如上代码读锁的lock方法调用了AQS的aquireShared方法,内部调用了 ReentrantReadWriteLock 中的 sync 重写的 tryAcquireShared 方法,源码如下:

protected final int tryAcquireShared(int unused) {

   //(1)获取当前状态值
Thread current = Thread.currentThread();
int c = getState(); //(2)判断是否写锁被占用
if (exclusiveCount(c) != &&
getExclusiveOwnerThread() != current)
return -; //(3)获取读锁计数
int r = sharedCount(c);
//(4)尝试获取锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//(5)第一个线程获取读锁
if (r == ) {
firstReader = current;
firstReaderHoldCount = ;
//(6)如果当前线程是第一个获取读锁的线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//(7)记录最后一个获取读锁的线程或记录其它线程读锁的可重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == )
readHolds.set(rh);
rh.count++;
}
return ;
}
//(8)类似tryAcquireShared,但是是自旋获取
return fullTryAcquireShared(current);
}

如上代码,首先获取了当前AQS的状态值,然后代码(2)看是否有其他线程获取到了写锁,如果是则直接返回了-1,然后调用AQS的doAcquireShared 方法把当前线程放入阻塞队列。

否则执行到代码(3)得到获取到读锁的线程个数,到这里要说明目前没有线程获取到写锁,但是还是有可能有线程持有读锁,然后执行代码(4),非公平锁的readerShouldBlock实现代码如下:

final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
 final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
}

如上代码作用是如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是的话,则当前县城使用判断当前获取读锁线程是否达到了最大值,最后执行CAS操作设置AQS状态值的高 16 位值增加 1。

代码(5)(6)记录第一个获取读锁的线程,并统计该线程获取读锁的可重入次数,代码(7)使用cachedHoldCounter 记录最后一个获取到读锁的线程,并同时该线程获取读锁的可重入次数,另外readHolds记录了当前线程获取读锁的可重入次数。

如果readerShouldBlock 返回 true 则说明有线程正在获取写锁,则执行代码(8)fullTryAcquireShared 代码与 tryAcquireShared 类似,不同在于前者是通过循环自旋获取。

源码如下:

    final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != ) {
if (getExclusiveOwnerThread() != current)
return -;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == )
readHolds.remove();
}
}
if (rh.count == )
return -;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == ) {
firstReader = current;
firstReaderHoldCount = ;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == )
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return ;
}
}
}

  2.void lockInterruptibly() 类似 lock() 方法,不同在于该方法对中断响应,也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException。

  3.boolean tryLock() 尝试获取读锁,如果当前没有其它线程持有写锁,则当前线程获取写锁会成功,然后返回 true。如果当前已经其它线程持有写锁则该方法直接返回 false,当前线程并不会被阻塞。

如果其它获取当前线程已经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true。代码类似 tryLock 这里不再讲述。

  4.boolean tryLock(long timeout, TimeUnit unit) 与 tryLock()不同在于多了超时时间的参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到读锁则返回 false。

另外该方法对中断响应, 也就是当其它线程调用了该线程的 interrupt() 方法中断了当前线程,当前线程会抛出异常 InterruptedException

  5.void unlock() 释放锁。源码如下:

public void unlock() {
sync.releaseShared();
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果当前线程是第一个获取读锁线程
if (firstReader == current) {
//如果可重入次数为1
if (firstReaderHoldCount == )
firstReader = null;
else//否者可重入次数减去1
firstReaderHoldCount--;
} else {
//如果当前线程不是最后一个获取读锁线程,则从threadlocal里面获取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
//如果可重入次数<=1则清除threadlocal
int count = rh.count;
if (count <= ) {
readHolds.remove();
if (count <= )
throw unmatchedUnlockException();
}
//可重入次数减去一
--rh.count;
} //循环直到自己的读计数-1 cas更新成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) return nextc == ;
}
}

好了,到目前为止,我们知道了上一篇笔记中,使用ReentrantLock 实现的线程安全的 list, 但是由于 ReentrantLock 是独占锁所以在读多写少的情况下性能很差,下面使用 ReentrantReadWriteLock 来改造为如下代码:

package com.hjc;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; /**
* Created by cong on 2018/6/14.
*/
public class ReentrantReadWriteLockTest {
//线程不安全的list
private ArrayList<String> array = new ArrayList<String>();
//独占锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock(); //添加元素
public void add(String e) { writeLock.lock();
try {
array.add(e); } finally {
writeLock.unlock(); }
}
//删元素
public void remove(String e) { writeLock.lock();
try {
array.remove(e); } finally {
writeLock.unlock(); }
} //获取数据
public String get(int index) { readLock.lock();
try {
return array.get(index); } finally {
readLock.unlock(); }
}
}

如代码调用 get 方法适合使用的是读锁,这样运行多个读线程同时访问 list 的元素,在读多写少的情况下性能相比 ReentrantLock 会很好。