目前java有两种方式实现线程同步一种是synchronized方式 ,一种是基于AQS框架的方式
以下对两种方式涉及到的类做对比:
synchronized 方式 | AQS框架 | |
同步方式 | synchronized 同步块 | ReentrantLock或ReentrantReadAndWriteLock的lock()、tryLock()、lockInterruptibly()和unlock()方法 |
释放锁等待 | Object.wait() | Condition.await() |
唤醒等待线程 | Object.nobify(),Object.notifyAll() | Condition.signal(),Condition.signalAll() |
除此之外AQS框架还有一个CountDownLatch类,用于线程的同步;
AQS框架都是基于父类AbstractQueuedSynchronizer实现的,它的实现类包括ReentrantLock(独占锁),ReentrantReadAndWriteLock(独占锁与共享锁),Condition(看上面表的对比就知道作用类似于Object的wait和notify),还有CountDownLatch类,以及java的线程池ThreadPoolExecuter的内部类Worker;
下面讲讲父类AbstractOwnableSynchronizer这个类
java.util.concurrent.locks.AbstractQueuedSynchronizer有四个重要的成员变量
一、private transient Thread exclusiveOwnerThread;
当前持有独占锁的线程对象,有且只能有一个,其它线程只能进入到等待队列
二、private volatile int state;
state代表加锁状态,不同的实现类state的作用不一样:
1.对于ReentrantLock是代表独占锁数量,无锁时state=0,有锁时state>0, 第一次加锁时,将state设置为1,持有锁的线程,可以多次加锁,只有持有锁的线程才可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread()),即可加锁,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁,解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉了,其它线程又可以加锁了(所以加了几次锁就要解锁几次);当持有锁的线程释放锁以后,如果是公平锁,则等待队列会获取到加锁权限,在等待队列头部取出第一个线程去获取锁,获取锁的线程会被移出队列,如果是非公平锁,获取到加锁权限的有可能是等待队列中的第一个线程,也有可能是一个新加入的竞争锁的线程;
2.对于ReentrantReadAndWriteLock,state为4字节的整形,它的高位两个字节用来存储读锁数量(共享锁),低位两个字节用来存储写锁数量(独占锁),获取共享锁前要先判断有没有独占锁,如果存在读占锁,并且持有独占锁的线程不是当前线程,则获取锁失败;
3.对于CountDownLatch,state为初始化时传入的CountDown的次数,当state为0时,解除阻塞;
4.对于ThreadPoolExecuter的内部类Worker,state用来标识当前线程是否允许中断,Worker类就是线程池内运行的线程,它实现了Runable接口,继承了AbstractOwnableSynchronizer,state有3个值,-1代表Worker线程刚被实例化,还没运行,0代表线程已经运行,处于无锁状态,1代表线程已经加锁;线程的锁用于线程池在RUNNING和SHUTDOWN状态,Worker线程如果在执行任务,不允许被中断;
三、private transient volatile Node head;
等待获取锁的线程队列头节点
四、private transient volatile Node tail;
等待获取锁的线程的尾节点
如果state>0,其它需要加锁的线程需要等待持有锁的线程释放锁,等待获取锁的线程会放入到一个先进先出的链表队列,head和tail就是等待获取锁的队列的头节点和尾节点,除了头节点外,每个节点都与一个线程绑定(即在创建节点时,设置Node.thread = Thread.currentThread),等待获取锁的线程会通过调用Unsafe.park()方法阻塞等待;
持有锁的线程释放锁时会将state值为0,然后调用Unsafe.unpark()方法取消等待队列中头部的第一个线程的阻塞状态,去获取锁。
等待队列中的线程如果被park阻塞,这时其它线程中断了它,park就会解除阻塞,马上返回(这不是我们需要的,我们需要的是park方法一直阻塞,直到持有锁的线程释放了锁调用了unpark方法取消阻塞的返回),处于中断状态的线程 ,再调用park方法是无法阻塞的,所以这时必须要将线程状态重置为非中断状态,然后再次调用park方法阻塞等待,在线程获取到锁后再调用线程interrupt方法,恢复线程的中断状态;
几个重要的方法:
//尝试获取独占锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//尝试释放独占锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//尝试获取共享锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//尝试释放共享锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
上面这4个方法我们会发现都会抛出不支持的操作的异常,一般会在它的子类中重新实现这几个方法,原理都是通过调用UNSAFE类的CAS操作,来对state做加1操作(加锁)或减1操作(释放锁),因为是CAS操作,所以有可能失败;
//加锁操作
public final void acquire(int arg) {
//如果尝试加锁失败,则调用AddWaiter方法将当前线程封装为Node对象,放入到等待加锁的队列的队尾,Node.EXCLUSIVE代表独占锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果加锁失败,则将线程封装为一个Node对象放入到等待队列;
等待队列是由Node对象组成的一个双向链表,除了链表的头节点,每个Node都持有一个线程,以下是node的数据结构(Node是AQS的内部类);
static final class Node {
/**下面两个常量代表节点的类型**/
//常量:代表Node节点是一个要获取共享锁的节点
static final Node SHARED = new Node();
//常量:代表Node节点是一个要获取独占锁节点
static final Node EXCLUSIVE = null;
/**下面4个常量代表Node节点的等待状态**/
//代表该线程已经出现了异常,取消等待,这样的节点会被直接移出等待队列,放弃锁的争用;
static final int CANCELLED = 1;
/**代表该节点的线程在等待着持有锁的线程释放锁后,执行UNPARK操作,从队列头开始取消第一个阻塞节点的状态**/
static final int SIGNAL = -1;
// 代表该节点处于Condition.await()状态
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//节点的等待状态,值为上面4个常量的值,初始值为0,如果是头节点则也为0
volatile int waitStatus;
//该节点的前一个节点
volatile Node prev;
//该节点的后一个节点
volatile Node next;
//当前节点封装的等待获取锁的线程,如果该节点是链表的头节点,则Thread为null
volatile Thread thread;
//共享锁的下一个节点
Node nextWaiter;
//判断次节点是否是等待共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取本节点的前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
//此节点进入等待获取锁的阻塞队列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//此节点会进入Condition.await()队列
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
以下是入队过程:
//将当前线程封装为Node对象,放入等待队列队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果队列不为空,则尝试将node放入到队列尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;//如果入队成功,返回Node
}
}
//如果没有入队,则采用自旋锁的方式,将node加进队列;
enq(node);
return node;
}
//自旋锁,将node加入到队列尾部
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果t为null,说明队列为空,且还没有初始化,需要要初始化队列
if (t == null) {
//使用cas方式尝试设置头结点为一个新的Node节点;
if (compareAndSetHead(new Node()))
tail = head; //如果设置头结点成功,则将尾节点指向头节点;
//如果设置失败,说明有其它需要入队等待的线程已经初始化了队列;则继续循环
} else {//如果t不是null 说明队列不为空
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
未完待续;