前面已经说了很多Java并发和线程安全的东西,也提到并对比了内在锁和J.U.C包(java.util.concurrent包,后同)中Lock的锁。从这篇开始,对Java并发的整理从理论进入“实践”阶段,本篇对Lock、ReentrantLock和AbstractQueuedSynchronizer源码做简要分析和整理。先从Lock这个interface说起,然后分析ReentrantLock和AQS的实现。
0. 我们先看下Lock接口和ReentrantLock的大体实现。
- public interface Lock {
- void lock();
- void lockInterruptibly() throws InterruptedException;
- boolean tryLock();
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
- void unlock();
- Condition newCondition();
- }
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
可以看得出来,Lock能做这样几件事:
- 常规地获得锁
- 可中断地获得锁
- 尝试性获得锁,非阻塞
- 尝试性获得锁,如果超时则返回
- 解锁
- 生成和当前锁相关的条件(队列)对象
再来看下ReentrantLock的情况:
- 类声明,如下实现了Lock和Serializable接口
-
- public class ReentrantLock implements Lock, java.io.Serializable
public class ReentrantLock implements Lock, java.io.Serializable
- 属性,主要的属性只有内部类Sync的对象属性sync,ReentrantLock类的操作实际上都落在了sync身上
- 构造方法,有重载实现的两个,单参数的方法参数的含义为是否为公平锁,方法的实现就是构造一个Sync对象(根据公平参数,确定是FairSync还是NonfairSync)并赋值给sync,默认构造方法会调用参数为false的方法
- lock()和newCondition()调用sync的同名方法
- lockInterruptibly()调用sync的acquireInterruptibly(1)
- tryLock()调用sync的nonfairTryAcquire(1)
- 等待性tryLock()调用sync的tryAcquireNanos(1, unit.toNanos(timeout))
- unlock()调用sync的release(1)
除此之外,ReentrantLock是可重入锁,还有一些支持可重入的方法,这里不细说。可以说ReetrantLock是基于它的内部类Sync的对象来实现的,接下来看下Sync的类层次结构:
从eclipse中看,类层次结构一目了然,Sync被FairSync和Nonfair扩展,而父层有AbstactOwnableSynchronizer和AbstractQueuedSynchronizer。前者实现了当前同步器被哪个线程占有的逻辑,实现了get/setExclusiveOwnerThread()方法,确定获取当前互斥锁的Thread对象。后者则是java.util.concurrent包中非常重要的类,下面就重点来说说这个AbstractQueuedSynchronizer(AQS)。
1. AQS的队列结构和队列节点
从AbstractQueuedSynchronizer的名字就可以看得出来,这个类是抽象的队列的同步器。同步器不用说了;有关抽象的,以及具体如何和扩展的子类配合实现加锁和解锁,后面那段会具体描述;这里我们看看AQS的比较重要比较核心的部分,也就是状态处理和队列的实现。
从AQS类在eclipse的outline中,可以看出,除了序列化和具体的Unsafe底层操作相关的东西,AQS有三个最重要的属性和两个内部类:
- private volatile int state
- private transient volatile Node head
- private transient volatile Node tail
- static final class Node
- public class ConditionObject implements Condition, java.io.Serializable
其中state是当前的锁状态,通常(至少ReentrantLock是这么用的)这是锁是否被占用的一个重要标志,在ReentrantLock实现中是获得锁的重入线程数,0的时候是没有线程占用这个锁的。而和AQS实例绑定(就是非静态的内部类)的ConditionObject类是与条件对列相关的对象,后面细说。剩下的最重要就是Node静态内部类,也是构成队列的主要数据结构。其实此Node实现也并不复杂,就是通常的双向链表结构,有指向前后节点的引用,除此之外就是链表节点的数据部分,有如下属性字段:
- volatile int waitStatus。当前节点的状态,主要表示当前线程是获得锁、等待锁、在等待队列中等状态,对应于Node类中的几个常量
- volatile Thread thread。当前节点对应的线程对象。
- Node nextWaiter。AQS为每个条件对象单独维护了一个等待队列,依靠的就是这个属性引用。
在锁队列维护上,实际上是双向的。每次创建新节点,以当前线程为数据,nextWaiter指向互斥常量或共享常量。新增结点时,获取tail,并设置新节点的prev为tail,并尝试原子操作设置新节点为tail节点,如果tail结点为空或者设置tail结点出问题则调用enq方法循环尝试,其中为空 状态时,则new一个空Node为head,并让tail=head。
出队列的操作实际上是和线程相关的,在阻塞等待获得锁的过程中或者是执行condition的await()时,调用acquireQueued()方法,循环比较当前线程结点的上一个结点是不是head并调用tryAcquire()。如果成功,则设置当前node为head,并解除当前node向前以及前一个结点指向当前node的引用(设置为null),这样前一个结点就失去了引用链上的引用。第一次出队列的是首次初始化队列时创建的空Node对象,后面依次是之前被解锁的线程对应的node。当然,如果tryAcquire()不成功,则会将判断当前node的状态,如果是0则设置为SIGNAL常量并用LockSupport的park()方法挂起当前线程。
2. Sync和AQS的配合以及ReentrantLock的lock()和unlock()实现。
前面简单说到过,ReentrantLock的lock方法调用了sync的lock()方法,而不管是公平实现(FairSync)还是非公平实现(NonfairSync),所做的主要工作都是调用AQS的acquire()方法。而unlock()方法更直接,调用的是AQS的release()方法。
更进一步,对于acquire()和release()方法,所做的大概操作有两样,一个是调用名字为try开头的方法,即tryAcquire()和tryRelease()等,此外就是做队列和线程相关的操作。而对于AQS,有如下五个方法是未完整实现,需要扩展的子类进行定义的:
- protected boolean tryAcquire(int arg)
- protected boolean tryRelease(int arg)
- protected int tryAcquireShared(int arg)
- protected boolean tryReleaseShared(int arg)
- protected boolean isHeldExclusively()
结合ReentrantLock及其内部类Sync(以NonfairSync为例)的实现,主要是tryAcquire()和tryRelease(),我们看下如何构造锁操作。
当加锁时,调用acquire()方法,acquire()会尝试原子操作tryAcquire()。这个方法在非公平实现中,主要是通过AQS的state来检查和维护锁状态,如果state是0,说明没有线程占有这个锁,如果不为0并且锁的占有线程是当前线程,则是重入的情况,均可以获得锁并修改state值。如果是首次获得锁,则设置锁占有线程为当前线程。当然,如果前面两种情况都不满足,说明尝试获得锁失败,需要做前面段落所述的队列操作,创建一个等待结点并进入循环,循环中的park()调用挂起当前线程。
当解锁时,做对应而相反的操作。release()调用tryRelease()方法,如果修改state值成功,则找到队列中应该唤起的结点,对节点中的线程调用unpark()方法,恢复线程执行。这个操作在被恢复执行线程acquireQueued()方法的循环中完成,释放头结点并返回是否中断的状态,继续执行。
3. Lock的五个特点方面:尝试性非阻塞获得锁可中断、时间调度、公平性、一对多。
下面在简单介绍下ReentrantLock比起内在的synchronized锁的一些优秀特点的实现:
- 基于Unsafe的原子操作来修改state的状态,无论成功失败都会直接返回,这保证了非阻塞方式尝试获得锁
- 可中断和允许时间调度,则是利用了Unsafe的park方法的特性,park掉的线程是可以响应中断被唤醒的,而park的带有时间参数的重载方法则保证了时间调度性
- 公平和非公平实现,这个是在ReentrantLock的Sync的子类中实现的,主要的区别就是公平锁保证了队列的第一个节点先获得锁,而非公平不保证这点
- 至于一对多,貌似也没什么可多说的,一个类中可以有多个ReentrantLock类对象属性,自然就可以有多个锁,每个对象单独维护一个state属性
4. Condition的实现。
至于条件队列的实现,前文也多少提到了一些。AQS有个实现了Condition接口的内部类ConditionObject,其复用了锁队列的Node结点,单独为每个条件维护了一个单向链表队列。
当await()时,创建一个状态为CONDITION常量的Node类结点,释放当前线程的锁,并进入一个循环。这个循环退出的条件是结点已经被放到锁队列上或者是检测到了中断做中断处理,循环的内容就是不断的去park()掉当前线程。当循环退出后尝试重新获得锁,以继续执行等待后的代码。
而signal()/signalll()方法更好理解,主要操作就是将一个或者多个Node对象的状态设置为0,并将该节点加入获取锁的队列中,恢复线程。
本文对java.util.concurrent.locks的可重入锁机制和AQS进行了比较详细的分析,后续也有可能会对ReentrantReadWriteLock和Semaphore做分析。更详细的逻辑还请参照JDK的源码。