在上面一篇分析ThreadExecutedPool的文章中我们看到线程池实现源码中大量使用了ReentrantLock锁,那么ReentrantLock锁的优势是什么?它又是怎么实现的呢?
ReentrantLock又名可重入锁,为什么称之为可重入锁呢?简单来说因为它允许一个线程多次取获得该锁,不过多次获取该锁之后,也需要执行同样次数的释放锁操作,否则该锁将被当前线程一直持有,导致其它线程无法获取。需要注意的是,释放锁的操作需要我们用代码来控制,它并不会自动取释放锁。在ReentrantLock中实现了两种锁fairSync和NonfairSync,即公平锁和非公平锁,今天我们就来聊聊ReentrantLock中nonfairSync锁的实现。
废话不多说,下面开始分析代码!
1、1 Lock()
首先看一下lock()方法,这个方法非常重要,它也是我们获取锁的入口:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
ReentrantLock锁的初始状态为0,compareAndSetState方法将尝试获取锁并将当前锁的状态设置为1。如果成功获取了锁会调用setExclusiveOwnerThread()方法设置当前线程拥有该锁的独占访问权。
如果调用compareAndSetState()获取锁失败,则返回false并执行acquire(1)。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们看到acquire(1)的代码中有一条if语句,当tryAcquire(1)返回false以及acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回true时,才会去执行selfInterrupt();方法。下面我们来看看tryAcquire(1)和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这两个家伙干了什么。
先看一下tryAcquire()方法。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
tryAcquire方法会去调用nonfairTryAcquire()方法。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1、首先调用getState()方法获取当前锁状态,如果锁状态为0。表示当前锁没有被其他线程占用,这里会再次尝试去获取锁。如果成功的拿到了锁,将设置锁的拥有者为当前线程,同时返回true。如果此时返回true的话,表示当前线程成功获取到了锁,lock()方法调用成功。
2、如果当前锁状态不为0,判断当前线程是否为锁的拥有者,如果是的话,尝试将当前锁的状态值加acquires。如果当前neextc的值小于0,抛出异常。若不小于0,将当前锁的值设置为nextc。为什么说ReentrantLock为可重入锁,就体现在这里了,如果当前线程为锁的拥有者,该线程再次调用lock方法时,当前锁的状态值会加1。当然我们释放该锁的时候,也要调用相应的unlock()方法,以使得锁的state值为0,可被其他线程请求。
3、如果当前锁的值不为0且拥有锁的线程也不为当前线程则返回false。也就是tryAcquire()再次获取锁并没有成功。
值得注意的是,既然再第一次调用compareAndSetState()的时候,已经获取失败了为什么还要再调用tryAcquire()方法再获取一次呢?我们可以理解为这是一种保险机制,如果此时无法获取锁,我们将会将当前线程加入到阻塞队列中挂起等待后面被唤醒重新争夺锁。
回顾一下上面的if判断条件
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
如果tryAcquire()的返回值为false,那么接下来会执行acquireQueued(addWaiter(Node.EXCLUSIVE),arg)方法。这个方法看起来比较复杂,它在acquireQueued()方法又调用了addWaiter()方法,我们先来看看addWaiter()方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先我们创建了一个包含了当前线程的Node节点,并将tail(tail节点即是尾节点)赋值给pred节点。如果我们第一次进来,那么tail节点肯定为空,将会去执行enq(node)方法。如果tail不为空,那么接下来的三句代码干了什么呢?
先回忆一下,如果我们希望在一个双向链表的尾部新增一个节点,应该如何操作,大致应该有如下三步:
- node.prev = pred; node节点的前驱指向尾节点
- pred.next = node; 将尾节点的后继设置为当前节点
- tail = node; 将node节点设置为尾节点
我们再看一下详细代码:
- node.prev=pred; 当前pred节点代表的是尾节点,也就是说设置node节点的前驱为当前尾节点。
- if (compareAndSetTail(pred, node)),我们看下compareAndSetTail(pred, node)方法。
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
compareAndSetTail的原理其实就是CAS算法,将期望值和内存地址为tailOffset上的值进行比较,如果两者相同,则更新tailOffset上的值为最新值update。其实也就是如果tailOffset上的值和pred(老的尾节点)的值相同,则将尾节点更新为新的node节点。
- 将原尾节点的后继设置为当前节点。
其实上面三步实现的功能和在双向链表尾部新增一个节点的功能大致相同,只是顺序略有调整。
接下来看一下enq(node)方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq代码中有一个死循环for(;