本章,我们介绍锁的架构;后面的章节将会对它们逐个进行分析介绍。目录如下:
01. Java多线程系列--“JUC锁”01之 框架
02. Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock
03. Java多线程系列--“JUC锁”03之 公平锁(一)
04. Java多线程系列--“JUC锁”04之 公平锁(二)
05. Java多线程系列--“JUC锁”05之 非公平锁
06. Java多线程系列--“JUC锁”06之 Condition条件
07. Java多线程系列--“JUC锁”07之 LockSupport
08. Java多线程系列--“JUC锁”08之 共享锁和ReentrantReadWriteLock
09. Java多线程系列--“JUC锁”09之 CountDownLatch原理和示例
10. Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例
11. Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例
根据锁的添加到Java中的时间,Java中的锁,可以分为"同步锁"和"JUC包中的锁"。
同步锁
即通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁。Java 1.0版本中就已经支持同步锁了。
同步锁的原理是,对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。但是,在同一个时间点,该同步锁能且只能被一个线程获取到。这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行;而没有获取到同步锁的线程,必须进行等待,直到获取到同步锁之后才能继续运行。这就是,多线程通过同步锁进行同步的原理!
关于"同步锁"的更多内容,请参考"Java锁的基础部分"的内容。
JUC包中的锁
相比同步锁,JUC包中的锁的功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁,只是它的用法更难罢了。
JUC包中的锁,包括:Lock接口,ReadWriteLock接口,LockSupport阻塞原语,Condition条件,AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三个抽象类,ReentrantLock独占锁,ReentrantReadWriteLock读写锁。由于CountDownLatch,CyclicBarrier和Semaphore也是通过AQS来实现的;因此,我也将它们归纳到锁的框架中进行介绍。
先看看锁的框架图,如下所示。
01. Lock接口
JUC包中的 Lock 接口支持那些语义不同(重入、公平等)的锁规则。所谓语义不同,是指锁可是有"公平机制的锁"、"非公平机制的锁"、"可重入的锁"等等。"公平机制"是指"不同线程获取锁的机制是公平的",而"非公平机制"则是指"不同线程获取锁的机制是非公平的","可重入的锁"是指同一个锁能够被一个线程多次获取。
02. ReadWriteLock
ReadWriteLock 接口以和Lock类似的方式定义了一些读取者可以共享而写入者独占的锁。JUC包只有一个类实现了该接口,即 ReentrantReadWriteLock,因为它适用于大部分的标准用法上下文。但程序员可以创建自己的、适用于非标准要求的实现。
03. AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer就是被称之为AQS的类,它是一个非常有用的超类,可用来定义锁以及依赖于排队阻塞线程的其他同步器;ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。AbstractQueuedLongSynchronizer 类提供相同的功能但扩展了对同步状态的 64 位的支持。两者都扩展了类 AbstractOwnableSynchronizer(一个帮助记录当前保持独占同步的线程的简单类)。
04. LockSupport
LockSupport提供“创建锁”和“其他同步类的基本线程阻塞原语”。
LockSupport的功能和"Thread中的Thread.suspend()和Thread.resume()有点类似",LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。但是park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。
05. Condition
Condition需要和Lock联合使用,它的作用是代替Object监视器方法,可以通过await(),signal()来休眠/唤醒线程。
Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的 Object 版本中的不同。
06. ReentrantLock
ReentrantLock是独占锁。所谓独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁。ReentrantLock锁包括"公平的ReentrantLock"和"非公平的ReentrantLock"。"公平的ReentrantLock"是指"不同线程获取锁的机制是公平的",而"非公平的 ReentrantLock"则是指"不同线程获取锁的机制是非公平的",ReentrantLock是"可重入的锁"。
ReentrantLock的UML类图如下:
(01) ReentrantLock实现了Lock接口。
(02) ReentrantLock中有一个成员变量sync,sync是Sync类型;Sync是一个抽象类,而且它继承于AQS。
(03) ReentrantLock中有"公平锁类"FairSync和"非公平锁类"NonfairSync,它们都是Sync的子类。ReentrantReadWriteLock中sync对象,是FairSync与NonfairSync中的一种,这也意味着ReentrantLock是"公平锁"或"非公平锁"中的一种,ReentrantLock默认是非公平锁。
07. ReentrantReadWriteLock
ReentrantReadWriteLock是读写锁接口ReadWriteLock的实现类,它包括子类ReadLock和WriteLock。ReentrantLock是共享锁,而WriteLock是独占锁。
ReentrantReadWriteLock的UML类图如下:
(01) ReentrantReadWriteLock实现了ReadWriteLock接口。
(02) ReentrantReadWriteLock中包含sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。
(03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"FairSync和"非公平锁"NonfairSync。
08. CountDownLatch
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch的UML类图如下:
CountDownLatch包含了sync对象,sync是Sync类型。CountDownLatch的Sync是实例类,它继承于AQS。
09. CyclicBarrier
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier的UML类图如下:
CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。
CyclicBarrier和CountDownLatch的区别是:
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
10. Semaphore
Semaphore是一个计数信号量,它的本质是一个"共享锁"。
信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
Semaphore的UML类图如下:
和"ReentrantLock"一样,Semaphore包含了sync对象,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平信号量"FairSync和"非公平信号量"NonfairSync。
本章对ReentrantLock包进行基本介绍,这一章主要对ReentrantLock进行概括性的介绍,内容包括:
ReentrantLock介绍
ReentrantLock函数列表
ReentrantLock示例
在后面的两章,会分别介绍ReentrantLock的两个子类(公平锁和非公平锁)的实现原理。
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496101.html
ReentrantLock介绍
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。
顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
ReentrantLock函数列表
// 创建一个 ReentrantLock ,默认是“非公平锁”。
ReentrantLock()
// 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。
ReentrantLock(boolean fair)
// 查询当前线程保持此锁的次数。
int getHoldCount()
// 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。
protected Thread getOwner()
// 返回一个 collection,它包含可能正等待获取此锁的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正等待获取此锁的线程估计数。
int getQueueLength()
// 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 返回等待与此锁相关的给定条件的线程估计数。
int getWaitQueueLength(Condition condition)
// 查询给定线程是否正在等待获取此锁。
boolean hasQueuedThread(Thread thread)
// 查询是否有些线程正在等待获取此锁。
boolean hasQueuedThreads()
// 查询是否有些线程正在等待与此锁有关的给定条件。
boolean hasWaiters(Condition condition)
// 如果是“公平锁”返回true,否则返回false。
boolean isFair()
// 查询当前线程是否保持此锁。
boolean isHeldByCurrentThread()
// 查询此锁是否由任意线程保持。
boolean isLocked()
// 获取锁。
void lock()
// 如果当前线程未被中断,则获取锁。
void lockInterruptibly()
// 返回用来与此 Lock 实例一起使用的 Condition 实例。
Condition newCondition()
// 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
boolean tryLock()
// 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
boolean tryLock(long timeout, TimeUnit unit)
// 试图释放此锁。
void unlock()
ReentrantLock示例
通过对比“示例1”和“示例2”,我们能够清晰的认识lock和unlock的作用
示例1
1 import java.util.concurrent.locks.Lock;
2 import java.util.concurrent.locks.ReentrantLock;
3
4 // LockTest1.java
5 // 仓库
6 class Depot {
7 private int size; // 仓库的实际数量
8 private Lock lock; // 独占锁
9
10 public Depot() {
11 this.size = 0;
12 this.lock = new ReentrantLock();
13 }
14
15 public void produce(int val) {
16 lock.lock();
17 try {
18 size += val;
19 System.out.printf("%s produce(%d) --> size=%d\n",
20 Thread.currentThread().getName(), val, size);
21 } finally {
22 lock.unlock();
23 }
24 }
25
26 public void consume(int val) {
27 lock.lock();
28 try {
29 size -= val;
30 System.out.printf("%s consume(%d) <-- size=%d\n",
31 Thread.currentThread().getName(), val, size);
32 } finally {
33 lock.unlock();
34 }
35 }
36 };
37
38 // 生产者
39 class Producer {
40 private Depot depot;
41
42 public Producer(Depot depot) {
43 this.depot = depot;
44 }
45
46 // 消费产品:新建一个线程向仓库中生产产品。
47 public void produce(final int val) {
48 new Thread() {
49 public void run() {
50 depot.produce(val);
51 }
52 }.start();
53 }
54 }
55
56 // 消费者
57 class Customer {
58 private Depot depot;
59
60 public Customer(Depot depot) {
61 this.depot = depot;
62 }
63
64 // 消费产品:新建一个线程从仓库中消费产品。
65 public void consume(final int val) {
66 new Thread() {
67 public void run() {
68 depot.consume(val);
69 }
70 }.start();
71 }
72 }
73
74 public class LockTest1 {
75 public static void main(String[] args) {
76 Depot mDepot = new Depot();
77 Producer mPro = new Producer(mDepot);
78 Customer mCus = new Customer(mDepot);
79
80 mPro.produce(60);
81 mPro.produce(120);
82 mCus.consume(90);
83 mCus.consume(150);
84 mPro.produce(110);
85 }
86 }
运行结果:
Thread-0 produce(60) --> size=60
Thread-1 produce(120) --> size=180
Thread-3 consume(150) <-- size=30
Thread-2 consume(90) <-- size=-60
Thread-4 produce(110) --> size=50
结果分析:
(01) Depot 是个仓库。通过produce()能往仓库中生产货物,通过consume()能消费仓库中的货物。通过独占锁lock实现对仓库的互斥访问:在操作(生产/消费)仓库中货品前,会先通过lock()锁住仓库,操作完之后再通过unlock()解锁。
(02) Producer是生产者类。调用Producer中的produce()函数可以新建一个线程往仓库中生产产品。
(03) Customer是消费者类。调用Customer中的consume()函数可以新建一个线程消费仓库中的产品。
(04) 在主线程main中,我们会新建1个生产者mPro,同时新建1个消费者mCus。它们分别向仓库中生产/消费产品。
根据main中的生产/消费数量,仓库最终剩余的产品应该是50。运行结果是符合我们预期的!
这个模型存在两个问题:
(01) 现实中,仓库的容量不可能为负数。但是,此模型中的仓库容量可以为负数,这与现实相矛盾!
(02) 现实中,仓库的容量是有限制的。但是,此模型中的容量确实没有限制的!
这两个问题,我们稍微会讲到如何解决。现在,先看个简单的示例2;通过对比“示例1”和“示例2”,我们能更清晰的认识lock(),unlock()的用途。
示例2
1 import java.util.concurrent.locks.Lock;
2 import java.util.concurrent.locks.ReentrantLock;
3
4 // LockTest2.java
5 // 仓库
6 class Depot {
7 private int size; // 仓库的实际数量
8 private Lock lock; // 独占锁
9
10 public Depot() {
11 this.size = 0;
12 this.lock = new ReentrantLock();
13 }
14
15 public void produce(int val) {
16 // lock.lock();
17 // try {
18 size += val;
19 System.out.printf("%s produce(%d) --> size=%d\n",
20 Thread.currentThread().getName(), val, size);
21 // } catch (InterruptedException e) {
22 // } finally {
23 // lock.unlock();
24 // }
25 }
26
27 public void consume(int val) {
28 // lock.lock();
29 // try {
30 size -= val;
31 System.out.printf("%s consume(%d) <-- size=%d\n",
32 Thread.currentThread().getName(), val, size);
33 // } finally {
34 // lock.unlock();
35 // }
36 }
37 };
38
39 // 生产者
40 class Producer {
41 private Depot depot;
42
43 public Producer(Depot depot) {
44 this.depot = depot;
45 }
46
47 // 消费产品:新建一个线程向仓库中生产产品。
48 public void produce(final int val) {
49 new Thread() {
50 public void run() {
51 depot.produce(val);
52 }
53 }.start();
54 }
55 }
56
57 // 消费者
58 class Customer {
59 private Depot depot;
60
61 public Customer(Depot depot) {
62 this.depot = depot;
63 }
64
65 // 消费产品:新建一个线程从仓库中消费产品。
66 public void consume(final int val) {
67 new Thread() {
68 public void run() {
69 depot.consume(val);
70 }
71 }.start();
72 }
73 }
74
75 public class LockTest2 {
76 public static void main(String[] args) {
77 Depot mDepot = new Depot();
78 Producer mPro = new Producer(mDepot);
79 Customer mCus = new Customer(mDepot);
80
81 mPro.produce(60);
82 mPro.produce(120);
83 mCus.consume(90);
84 mCus.consume(150);
85 mPro.produce(110);
86 }
87 }
(某一次)运行结果:
Thread-0 produce(60) --> size=-60
Thread-4 produce(110) --> size=50
Thread-2 consume(90) <-- size=-60
Thread-1 produce(120) --> size=-60
Thread-3 consume(150) <-- size=-60
结果说明:
“示例2”在“示例1”的基础上去掉了lock锁。在“示例2”中,仓库中最终剩余的产品是-60,而不是我们期望的50。原因是我们没有实现对仓库的互斥访问。
示例3
在“示例3”中,我们通过Condition去解决“示例1”中的两个问题:“仓库的容量不可能为负数”以及“仓库的容量是有限制的”。
解决该问题是通过Condition。Condition是需要和Lock联合使用的:通过Condition中的await()方法,能让线程阻塞[类似于wait()];通过Condition的signal()方法,能让唤醒线程[类似于notify()]。
1 import java.util.concurrent.locks.Lock;
2 import java.util.concurrent.locks.ReentrantLock;
3 import java.util.concurrent.locks.Condition;
4
5 // LockTest3.java
6 // 仓库
7 class Depot {
8 private int capacity; // 仓库的容量
9 private int size; // 仓库的实际数量
10 private Lock lock; // 独占锁
11 private Condition fullCondtion; // 生产条件
12 private Condition emptyCondtion; // 消费条件
13
14 public Depot(int capacity) {
15 this.capacity = capacity;
16 this.size = 0;
17 this.lock = new ReentrantLock();
18 this.fullCondtion = lock.newCondition();
19 this.emptyCondtion = lock.newCondition();
20 }
21
22 public void produce(int val) {
23 lock.lock();
24 try {
25 // left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
26 int left = val;
27 while (left > 0) {
28 // 库存已满时,等待“消费者”消费产品。
29 while (size >= capacity)
30 fullCondtion.await();
31 // 获取“实际生产的数量”(即库存中新增的数量)
32 // 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
33 // 否则“实际增量”=“想要生产的数量”
34 int inc = (size+left)>capacity ? (capacity-size) : left;
35 size += inc;
36 left -= inc;
37 System.out.printf("%s produce(%3d) --> left=%3d, inc=%3d, size=%3d\n",
38 Thread.currentThread().getName(), val, left, inc, size);
39 // 通知“消费者”可以消费了。
40 emptyCondtion.signal();
41 }
42 } catch (InterruptedException e) {
43 } finally {
44 lock.unlock();
45 }
46 }
47
48 public void consume(int val) {
49 lock.lock();
50 try {
51 // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
52 int left = val;
53 while (left > 0) {
54 // 库存为0时,等待“生产者”生产产品。
55 while (size <= 0)
56 emptyCondtion.await();
57 // 获取“实际消费的数量”(即库存中实际减少的数量)
58 // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
59 // 否则,“实际消费量”=“客户要消费的数量”。
60 int dec = (size<left) ? size : left;
61 size -= dec;
62 left -= dec;
63 System.out.printf("%s consume(%3d) <-- left=%3d, dec=%3d, size=%3d\n",
64 Thread.currentThread().getName(), val, left, dec, size);
65 fullCondtion.signal();
66 }
67 } catch (InterruptedException e) {
68 } finally {
69 lock.unlock();
70 }
71 }
72
73 public String toString() {
74 return "capacity:"+capacity+", actual size:"+size;
75 }
76 };
77
78 // 生产者
79 class Producer {
80 private Depot depot;
81
82 public Producer(Depot depot) {
83 this.depot = depot;
84 }
85
86 // 消费产品:新建一个线程向仓库中生产产品。
87 public void produce(final int val) {
88 new Thread() {
89 public void run() {
90 depot.produce(val);
91 }
92 }.start();
93 }
94 }
95
96 // 消费者
97 class Customer {
98 private Depot depot;
99
100 public Customer(Depot depot) {
101 this.depot = depot;
102 }
103
104 // 消费产品:新建一个线程从仓库中消费产品。
105 public void consume(final int val) {
106 new Thread() {
107 public void run() {
108 depot.consume(val);
109 }
110 }.start();
111 }
112 }
113
114 public class LockTest3 {
115 public static void main(String[] args) {
116 Depot mDepot = new Depot(100);
117 Producer mPro = new Producer(mDepot);
118 Customer mCus = new Customer(mDepot);
119
120 mPro.produce(60);
121 mPro.produce(120);
122 mCus.consume(90);
123 mCus.consume(150);
124 mPro.produce(110);
125 }
126 }
(某一次)运行结果:
Thread-0 produce( 60) --> left= 0, inc= 60, size= 60
Thread-1 produce(120) --> left= 80, inc= 40, size=100
Thread-2 consume( 90) <-- left= 0, dec= 90, size= 10
Thread-3 consume(150) <-- left=140, dec= 10, size= 0
Thread-4 produce(110) --> left= 10, inc=100, size=100
Thread-3 consume(150) <-- left= 40, dec=100, size= 0
Thread-4 produce(110) --> left= 0, inc= 10, size= 10
Thread-3 consume(150) <-- left= 30, dec= 10, size= 0
Thread-1 produce(120) --> left= 0, inc= 80, size= 80
Thread-3 consume(150) <-- left= 0, dec= 30, size= 50
代码中的已经包含了很详细的注释,这里就不再说明了。
更多“生产者/消费者模型”的更多内容,可以参考“Java多线程系列--“基础篇”11之 生产消费者问题”。
而关于Condition的内容,在后面我们会详细介绍。
Java多线程系列--“JUC锁”03之 公平锁(一)
概要
本章对“公平锁”的获取锁机制进行介绍(本文的公平锁指的是互斥锁的公平锁),内容包括:
基本概念
ReentrantLock数据结构
参考代码
获取公平锁(基于JDK1.7.0_40)
一. tryAcquire()
二. addWaiter()
三. acquireQueued()
四. selfInterrupt()
“公平锁”的释放锁的机制在后面一章再做介绍,锁的使用示例请参考“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”。
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496147.html
基本概念
本章,我们会讲解“线程获取公平锁”的原理;在讲解之前,需要了解几个基本概念。后面的内容,都是基于这些概念的;这些概念可能比较枯燥,但从这些概念中,能窥见“java锁”的一些架构,这对我们了解锁是有帮助的。
1. AQS -- 指AbstractQueuedSynchronizer类。
AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。
2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
(01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。
(02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。这些锁的用途和原理,在以后的章节再详细介绍。
3. CLH队列 -- Craig, Landin, and Hagersten lock queue
CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。
CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。
4. CAS函数 -- Compare And Swap
CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。
本章是围绕“公平锁”如何获取锁而层次展开。“公平锁”涉及到的知识点比较多,但总的来说,不是特别难;如果读者能读懂AQS和ReentrantLock.java这两个类的大致意思,理解锁的原理和机制也就不成问题了。本章只是作者本人对锁的一点点理解,希望这部分知识能帮助您了解“公平锁”的获取过程,认识“锁”的框架。
ReentrantLock数据结构
ReentrantLock的UML类图
从图中可以看出:
(01) ReentrantLock实现了Lock接口。
(02) ReentrantLock与sync是组合关系。ReentrantLock中,包含了Sync对象;而且,Sync是AQS的子类;更重要的是,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。ReentrantLock是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync对象是"FairSync的实例"还是"NonFairSync的实例"。
参考代码
下面给出Java1.7.0_40版本中,ReentrantLock和AQS的源码,仅供参考!
ReentranLock.java
View Code
AQS(AbstractQueuedSynchronizer.java)
View Code
获取公平锁(基于JDK1.7.0_40)
通过前面“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”的“示例1”,我们知道,获取锁是通过lock()函数。下面,我们以lock()对获取公平锁的过程进行展开。
1. lock()
lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:
final void lock() {
acquire(1);
}
说明:“当前线程”实际上是通过acquire(1)获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。
由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推...这就是为什么获取锁时,传入的参数是1的原因了。
可重入就是指锁可以被单个线程多次获取。
2. acquire()
acquire()在AQS中实现的,它的源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线程"会调用selfInterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。
上面是对acquire()的概括性说明。下面,我们将该函数分为4部分来逐步解析。
一. tryAcquire()
二. addWaiter()
三. acquireQueued()
四. selfInterrupt()
一. tryAcquire()
1. tryAcquire()
公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:
protected final boolean tryAcquire(int acquires) {
// 获取“当前线程”
final Thread current = Thread.currentThread();
// 获取“独占锁”的状态
int c = getState();
// c=0意味着“锁没有被任何线程锁拥有”,
if (c == 0) {
// 若“锁没有被任何线程锁拥有”,
// 则判断“当前线程”是不是CLH队列中的第一个线程线程,
// 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果“独占锁”的拥有者已经为“当前线程”,
// 则将更新锁的状态。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!
尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。后面我们会说明,在尝试失败的情况下,是如何一步步获取锁的。
2. hasQueuedPredecessors()
hasQueuedPredecessors()在AQS中实现,源码如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
说明: 通过代码,能分析出,hasQueuedPredecessors() 是通过判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。下面对head、tail和Node进行说明。
3. Node的源码
Node就是CLH队列的节点。Node在AQS中实现,它的数据结构如下:
private transient volatile Node head; // CLH队列的队首
private transient volatile Node tail; // CLH队列的队尾
// CLH队列的节点
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 线程已被取消,对应的waitStatus的值
static final int CANCELLED = 1;
// “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。
// 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
static final int SIGNAL = -1;
// 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值
static final int CONDITION = -2;
// (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值
static final int PROPAGATE = -3;
// waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,
// 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
volatile int waitStatus;
// 前一节点
volatile Node prev;
// 后一节点
volatile Node next;
// 节点所对应的线程
volatile Thread thread;
// nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
// 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
// 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
Node nextWaiter;
// “共享锁”则返回true,“独占锁”则返回false。
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前一节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
说明:
Node是CLH队列的节点,代表“等待锁的线程队列”。
(01) 每个Node都会一个线程对应。
(02) 每个Node会通过prev和next分别指向上一个节点和下一个节点,这分别代表上一个等待线程和下一个等待线程。
(03) Node通过waitStatus保存线程的等待状态。
(04) Node通过nextWaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextWaiter的值为EXCLUSIVE;如果是“共享锁”线程,则nextWaiter的值是SHARED。
4. compareAndSetState()
compareAndSetState()在AQS中实现。它的源码如下:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
说明: compareAndSwapInt() 是sun.misc.Unsafe类中的一个本地方法。对此,我们需要了解的是 compareAndSetState(expect, update) 是以原子的方式操作当前线程;若当前线程的状态为expect,则设置它的状态为update。
5. setExclusiveOwnerThread()
setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中实现,它的源码如下:
// exclusiveOwnerThread是当前拥有“独占锁”的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
说明:setExclusiveOwnerThread()的作用就是,设置线程t为当前拥有“独占锁”的线程。
6. getState(), setState()
getState()和setState()都在AQS中实现,源码如下:
// 锁的状态
private volatile int state;
// 设置锁的状态
protected final void setState(int newState) {
state = newState;
}
// 获取锁的状态
protected final int getState() {
return state;
}
说明:state表示锁的状态,对于“独占锁”而已,state=0表示锁是可获取状态(即,锁没有被任何线程锁持有)。由于java中的独占锁是可重入的,state的值可以>1。
小结:tryAcquire()的作用就是让“当前线程”尝试获取锁。获取成功返回true,失败则返回false。
二. addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE)的作用是,创建“当前线程”的Node节点,且Node中记录“当前线程”对应的锁是“独占锁”类型,并且将该节点添加到CLH队列的末尾。
1.addWaiter()
addWaiter()在AQS中实现,源码如下:
private Node addWaiter(Node mode) {
// 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
enq(node);
return node;
}
说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。
2. compareAndSetTail()
compareAndSetTail()在AQS中实现,源码如下:
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
说明:compareAndSetTail也属于CAS函数,也是通过“本地方法”实现的。compareAndSetTail(expect, update)会以原子的方式进行操作,它的作用是判断CLH队列的队尾是不是为expect,是的话,就将队尾设为update。
3. enq()
enq()在AQS中实现,源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
说明: enq()的作用很简单。如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾。否则,直接将node添加到CLH末尾。
小结:addWaiter()的作用,就是将当前线程添加到CLH队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。
三. acquireQueued()
前面,我们已经将当前线程添加到CLH队列中了。而acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。下面,我们看看acquireQueued()的具体流程。
1. acquireQueued()
acquireQueued()在AQS中实现,源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// interrupted表示在CLH队列的调度中,
// “当前线程”在休眠时,有没有被中断过。
boolean interrupted = false;
for (;;) {
// 获取上一个节点。
// node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
说明:acquireQueued()的目的是从队列中获取锁。
2. shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()在AQS中实现,源码如下:
// 返回“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继节点的状态
int ws = pred.waitStatus;
// 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
if (ws == Node.SIGNAL)
return true;
// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
说明:
(01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。
CANCELLED[1] -- 当前线程已被取消
SIGNAL[-1] -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
[0] -- 当前线程不属于上面的任何一种状态。
(02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。
规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt()中返回。
3. parkAndCheckInterrupt())
parkAndCheckInterrupt()在AQS中实现,源码如下:
private final boolean parkAndCheckInterrupt() {
// 通过LockSupport的park()阻塞“当前线程”。
LockSupport.park(this);
// 返回线程的中断状态。
return Thread.interrupted();
}
说明:parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。
这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:
第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。
补充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒。
它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过Synchronized获取同步锁。
关于LockSupport,我们会在之后的章节再专门进行介绍!
4. 再次tryAcquire()
了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函数之后。我们接着分析acquireQueued()的for循环部分。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
说明:
(01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点,若对此有疑惑可以查看下面关于Node类的介绍。
(02) p == head && tryAcquire(arg)
首先,判断“前继节点”是不是CHL表头。如果是的话,则通过tryAcquire()尝试获取锁。
其实,这样做的目的是为了“让当前线程获取锁”,但是为什么需要先判断p==head呢?理解这个对理解“公平锁”的机制很重要,因为这么做的原因就是为了保证公平性!
(a) 前面,我们在shouldParkAfterFailedAcquire()我们判断“当前线程”是否需要阻塞;
(b) 接着,“当前线程”阻塞的话,会调用parkAndCheckInterrupt()来阻塞线程。当线程被解除阻塞的时候,我们会返回线程的中断状态。而线程被解决阻塞,可能是由于“线程被中断”,也可能是由于“其它线程调用了该线程的unpark()函数”。
(c) 再回到p==head这里。如果当前线程是因为其它线程调用了unpark()函数而被唤醒,那么唤醒它的线程,应该是它的前继节点所对应的线程(关于这一点,后面在“释放锁”的过程中会看到)。 OK,是前继节点调用unpark()唤醒了当前线程!
此时,再来理解p==head就很简单了:当前继节点是CLH队列的头节点,并且它释放锁之后;就轮到当前节点获取锁了。然后,当前节点通过tryAcquire()获取锁;获取成功的话,通过setHead(node)设置当前节点为头节点,并返回。
总之,如果“前继节点调用unpark()唤醒了当前线程”并且“前继节点是CLH表头”,此时就是满足p==head,也就是符合公平性原则的。否则,如果当前线程是因为“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!
小结:acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。
四. selfInterrupt()
selfInterrupt()是AQS中实现,源码如下:
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。
在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!
小结:selfInterrupt()的作用就是当前线程自己产生一个中断。
总结
再回过头看看acquire()函数,它最终的目的是获取锁!
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
(02) 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到"CLH队列"末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
Java多线程系列--“JUC锁”04之 公平锁(二)
概要
前面一章,我们学习了“公平锁”获取锁的详细流程;这里,我们再来看看“公平锁”释放锁的过程。内容包括:
参考代码
释放公平锁(基于JDK1.7.0_40)
“公平锁”的获取过程请参考“Java多线程系列--“JUC锁”03之 公平锁(一)”,锁的使用示例请参考“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”。
注意:
(01) 这里是以“公平锁”来进行说明。
(02) 关于本章的术语,如“AQS”,“CAS函数”,“CLH队列”,“公平锁”,“非公平锁”,“独占锁”,“共享锁”等内容,请参考Java多线程系列--“JUC锁”03之 公平锁(一)的基本概念。
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496609.html
参考代码
下面给出Java1.7.0_40版本中,ReentrantLock和AQS的源码,仅供参考!
ReentranLock.java
View Code
AQS(AbstractQueuedSynchronizer.java)
释放公平锁(基于JDK1.7.0_40)
1. unlock()
unlock()在ReentrantLock.java中实现的,源码如下:
public void unlock() {
sync.release(1);
}
说明:
unlock()是解锁函数,它是通过AQS的release()函数来实现的。
在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
关于AQS, ReentrantLock 和 sync的关系如下:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
...
}
从中,我们发现:sync是ReentrantLock.java中的成员对象,而Sync是AQS的子类。
2. release()
release()在AQS中实现的,源码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
说明:
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。
3. tryRelease()
tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:
protected final boolean tryRelease(int releases) {
// c是本次释放锁之后的状态
int c = getState() - releases;
// 如果“当前线程”不是“锁的持有者”,则抛出异常!
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置当前线程的锁的状态。
setState(c);
return free;
}
说明:
tryRelease()的作用是尝试释放锁。
(01) 如果“当前线程”不是“锁的持有者”,则抛出异常。
(02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。
getState(), setState()在前一章已经介绍过,这里不再说明。
getExclusiveOwnerThread(), setExclusiveOwnerThread()在AQS的父类AbstractOwnableSynchronizer.java中定义,源码如下:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
// “锁”的持有线程
private transient Thread exclusiveOwnerThread;
// 设置“锁的持有线程”为t
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
// 获取“锁的持有线程”
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
...
}
4. unparkSuccessor()
在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
下面看看unparkSuccessor()的源码,它在AQS中实现。
private void unparkSuccessor(Node node) {
// 获取当前线程的状态
int ws = node.waitStatus;
// 如果状态<0,则设置状态=0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
// 这里的有效,是指“后继节点对应的线程状态<=0”
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒“后继节点对应的线程”
if (s != null)
LockSupport.unpark(s.thread);
}
说明:
unparkSuccessor()的作用是“唤醒当前线程的后继线程”。后继线程被唤醒之后,就可以获取该锁并恢复运行了。
关于node.waitStatus的说明,请参考“上一章关于Node类的介绍”。
总结
“释放锁”的过程相对“获取锁”的过程比较简单。释放锁时,主要进行的操作,是更新当前线程对应的锁的状态。如果当前线程对锁已经彻底释放,则设置“锁”的持有线程为null,设置当前线程的状态为空,然后唤醒后继线程。
Java多线程系列--“JUC锁”05之 非公平锁
概要
前面两章分析了"公平锁的获取和释放机制",这一章开始对“非公平锁”的获取锁/释放锁的过程进行分析。内容包括:
参考代码
获取非公平锁(基于JDK1.7.0_40)
释放非公平锁(基于JDK1.7.0_40)
关于锁的数据结构请参考"Java多线程系列--“JUC锁”03之 公平锁(一) ",锁的使用示例请参考“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”。
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496651.html
参考代码
下面给出Java1.7.0_40版本中,ReentrantLock和AQS的源码,仅供参考!
ReentranLock.java
View Code
AQS(AbstractQueuedSynchronizer.java)
View Code
获取非公平锁(基于JDK1.7.0_40)
非公平锁和公平锁在获取锁的方法上,流程是一样的;它们的区别主要表现在“尝试获取锁的机制不同”。简单点说,“公平锁”在每次尝试获取锁时,都是采用公平策略(根据等待队列依次排序等待);而“非公平锁”在每次尝试获取锁时,都是采用的非公平策略(无视等待队列,直接尝试获取锁,如果锁是空闲的,即可获取状态,则获取锁)。
在前面的“Java多线程系列--“JUC锁”03之 公平锁(一)”中,已经详细介绍了获取公平锁的流程和机制;下面,通过代码分析以下获取非公平锁的流程。
1. lock()
lock()在ReentrantLock.java的NonfairSync类中实现,它的源码如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
说明:
lock()会先通过compareAndSet(0, 1)来判断“锁”是不是空闲状态。是的话,“当前线程”直接获取“锁”;否则的话,调用acquire(1)获取锁。
(01) compareAndSetState()是CAS函数,它的作用是比较并设置当前锁的状态。若锁的状态值为0,则设置锁的状态值为1。
(02) setExclusiveOwnerThread(Thread.currentThread())的作用是,设置“当前线程”为“锁”的持有者。
“公平锁”和“非公平锁”关于lock()的对比
公平锁 -- 公平锁的lock()函数,会直接调用acquire(1)。
非公平锁 -- 非公平锁会先判断当前锁的状态是不是空闲,是的话,就不排队,而是直接获取锁。
2. acquire()
acquire()在AQS中实现的,它的源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列依次排序,然后获取锁。
(02) “当前线程”尝试失败的情况下,会先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。
(03) 然后,调用acquireQueued()获取锁。在acquireQueued()中,当前线程会等待它在“CLH队列”中前面的所有线程执行并释放锁之后,才能获取锁并返回。如果“当前线程”在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。
“公平锁”和“非公平锁”关于acquire()的对比
公平锁和非公平锁,只有tryAcquire()函数的实现不同;即它们尝试获取锁的机制不同。这就是我们所说的“它们获取锁策略的不同所在之处”!
在“Java多线程系列--“JUC锁”03之 公平锁(一)”中,已经详细介绍了acquire()涉及到的各个函数。这里仅对它们有差异的函数tryAcquire()进行说明。
非公平锁的tryAcquire()在ReentrantLock.java的NonfairSync类中实现,源码如下:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire()在ReentrantLock.java的Sync类中实现,源码如下:
final boolean nonfairTryAcquire(int acquires) {
// 获取“当前线程”
final Thread current = Thread.currentThread();
// 获取“锁”的状态
int c = getState();
// c=0意味着“锁没有被任何线程锁拥有”
if (c == 0) {
// 若“锁没有被任何线程锁拥有”,则通过CAS函数设置“锁”的状态为acquires。
// 同时,设置“当前线程”为锁的持有者。
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果“锁”的持有者已经是“当前线程”,
// 则将更新锁的状态。
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
说明:
根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。
(01) 如果“锁”没有被任何线程拥有,则通过CAS函数设置“锁”的状态为acquires,同时,设置“当前线程”为锁的持有者,然后返回true。
(02) 如果“锁”的持有者已经是当前线程,则将更新锁的状态即可。
(03) 如果不术语上面的两种情况,则认为尝试失败。
“公平锁”和“非公平锁”关于tryAcquire()的对比
公平锁和非公平锁,它们尝试获取锁的方式不同。
公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。
而非公平锁在尝试获取锁时,如果“锁”没有被任何线程持有,则不管它在CLH队列的何处,它都直接获取锁。
释放非公平锁(基于JDK1.7.0_40)
非公平锁和公平锁在释放锁的方法和策略上是一样的。
而在前面的“Java多线程系列--“JUC锁”04之 公平锁(二) ”中,已经对“释放公平锁”进行了介绍;这里就不再重复的进行说明。
总结
公平锁和非公平锁的区别,是在获取锁的机制上的区别。表现在,在尝试获取锁时 —— 公平锁,只有在当前线程是CLH等待队列的表头时,才获取锁;而非公平锁,只要当前锁处于空闲状态,则直接获取锁,而不管CLH等待队列中的顺序。
只有当非公平锁尝试获取锁失败的时候,它才会像公平锁一样,进入CLH等待队列排序等待。
Java多线程系列--“JUC锁”06之 Condition条件
概要
前面对JUC包中的锁的原理进行了介绍,本章会JUC中对与锁经常配合使用的Condition进行介绍,内容包括:
Condition介绍
Condition函数列表
Condition示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496716.html
Condition介绍
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
Condition函数列表
// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()
Condition示例
示例1是通过Object的wait(), notify()来演示线程的休眠/唤醒功能。
示例2是通过Condition的await(), signal()来演示线程的休眠/唤醒功能。
示例3是通过Condition的高级功能。
示例1
public class WaitTest1 {
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
ta.wait(); // 等待
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
System.out.println(Thread.currentThread().getName()+" wakup others");
notify(); // 唤醒“当前对象上的等待线程”
}
}
}
}
示例2
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest1 {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
lock.lock(); // 获取锁
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
condition.await(); // 等待
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
lock.lock(); // 获取锁
try {
System.out.println(Thread.currentThread().getName()+" wakup others");
condition.signal(); // 唤醒“condition所在锁上的其它线程”
} finally {
lock.unlock(); // 释放锁
}
}
}
}
运行结果:
main start ta
main block
ta wakup others
main continue
通过“示例1”和“示例2”,我们知道Condition和Object的方法有一下对应关系:
Object Condition
休眠 wait await
唤醒个线程 notify signal
唤醒所有线程 notifyAll signalAll
Condition除了支持上面的功能之外,它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。 如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。
看看下面的示例3,可能对这个概念有更深刻的理解。
示例3
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[5];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
while (count == items.length)
notFull.await();
// 将x添加到缓冲中
items[putptr] = x;
// 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
if (++putptr == items.length) putptr = 0;
// 将“缓冲”数量+1
++count;
// 唤醒take线程,因为take线程通过notEmpty.await()等待
notEmpty.signal();
// 打印写入的数据
System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
} finally {
lock.unlock(); // 释放锁
}
}
public Object take() throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
while (count == 0)
notEmpty.await();
// 将x从缓冲中取出
Object x = items[takeptr];
// 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
if (++takeptr == items.length) takeptr = 0;
// 将“缓冲”数量-1
--count;
// 唤醒put线程,因为put线程通过notFull.await()等待
notFull.signal();
// 打印取出的数据
System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
return x;
} finally {
lock.unlock(); // 释放锁
}
}
}
public class ConditionTest2 {
private static BoundedBuffer bb = new BoundedBuffer();
public static void main(String[] args) {
// 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
// 启动10个“读线程”,从BoundedBuffer中不断的读数据。
for (int i=0; i<10; i++) {
new PutThread("p"+i, i).start();
new TakeThread("t"+i).start();
}
}
static class PutThread extends Thread {
private int num;
public PutThread(String name, int num) {
super(name);
this.num = num;
}
public void run() {
try {
Thread.sleep(1); // 线程休眠1ms
bb.put(num); // 向BoundedBuffer中写入数据
} catch (InterruptedException e) {
}
}
}
static class TakeThread extends Thread {
public TakeThread(String name) {
super(name);
}
public void run() {
try {
Thread.sleep(10); // 线程休眠1ms
Integer num = (Integer)bb.take(); // 从BoundedBuffer中取出数据
} catch (InterruptedException e) {
}
}
}
}
(某一次)运行结果:
p1 put 1
p4 put 4
p5 put 5
p0 put 0
p2 put 2
t0 take 1
p3 put 3
t1 take 4
p6 put 6
t2 take 5
p7 put 7
t3 take 0
p8 put 8
t4 take 2
p9 put 9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9
结果说明:
(01) BoundedBuffer 是容量为5的缓冲,缓冲中存储的是Object对象,支持多线程的读/写缓冲。多个线程操作“一个BoundedBuffer对象”时,它们通过互斥锁lock对缓冲区items进行互斥访问;而且同一个BoundedBuffer对象下的全部线程共用“notFull”和“notEmpty”这两个Condition。
notFull用于控制写缓冲,notEmpty用于控制读缓冲。当缓冲已满的时候,调用put的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。 简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。
同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。
(02) 在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。
(03) 简单分析一下运行结果。
1, p1线程向缓冲中写入1。 此时,缓冲区数据: | 1 | | | | |
2, p4线程向缓冲中写入4。 此时,缓冲区数据: | 1 | 4 | | | |
3, p5线程向缓冲中写入5。 此时,缓冲区数据: | 1 | 4 | 5 | | |
4, p0线程向缓冲中写入0。 此时,缓冲区数据: | 1 | 4 | 5 | 0 | |
5, p2线程向缓冲中写入2。 此时,缓冲区数据: | 1 | 4 | 5 | 0 | 2 |
此时,缓冲区容量为5;缓冲区已满!如果此时,还有“写线程”想往缓冲中写入数据,会调用put中的notFull.await()等待,直接缓冲区非满状态,才能继续运行。
6, t0线程从缓冲中取出数据1。此时,缓冲区数据: | | 4 | 5 | 0 | 2 |
7, p3线程向缓冲中写入3。 此时,缓冲区数据: | 3 | 4 | 5 | 0 | 2 |
8, t1线程从缓冲中取出数据4。此时,缓冲区数据: | 3 | | 5 | 0 | 2 |
9, p6线程向缓冲中写入6。 此时,缓冲区数据: | 3 | 6 | 5 | 0 | 2 |
...
Java多线程系列--“JUC锁”07之 LockSupport
概述
本章介绍JUC(java.util.concurrent)包中的LockSupport。内容包括:
LockSupport介绍
LockSupport函数列表
LockSupport参考代码(基于JDK1.7.0_40)
LockSupport示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3505784.html
LockSupport介绍
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。
因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。
LockSupport函数列表
// 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 null。
static Object getBlocker(Thread t)
// 为了线程调度,禁用当前线程,除非许可可用。
static void park()
// 为了线程调度,在许可可用之前禁用当前线程。
static void park(Object blocker)
// 为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用。
static void parkNanos(long nanos)
// 为了线程调度,在许可可用前禁用当前线程,并最多等待指定的等待时间。
static void parkNanos(Object blocker, long nanos)
// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
static void parkUntil(long deadline)
// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
static void parkUntil(Object blocker, long deadline)
// 如果给定线程的许可尚不可用,则使其可用。
static void unpark(Thread thread)
LockSupport参考代码(基于JDK1.7.0_40)
LockSupport.java的源码如下:
View Code说明:LockSupport是通过调用Unsafe函数中的接口实现阻塞和解除阻塞的。
LockSupport示例
对比下面的“示例1”和“示例2”可以更清晰的了解LockSupport的用法。
示例1
1 public class WaitTest1 {
2
3 public static void main(String[] args) {
4
5 ThreadA ta = new ThreadA("ta");
6
7 synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
8 try {
9 System.out.println(Thread.currentThread().getName()+" start ta");
10 ta.start();
11
12 System.out.println(Thread.currentThread().getName()+" block");
13 // 主线程等待
14 ta.wait();
15
16 System.out.println(Thread.currentThread().getName()+" continue");
17 } catch (InterruptedException e) {
18 e.printStackTrace();
19 }
20 }
21 }
22
23 static class ThreadA extends Thread{
24
25 public ThreadA(String name) {
26 super(name);
27 }
28
29 public void run() {
30 synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
31 System.out.println(Thread.currentThread().getName()+" wakup others");
32 notify(); // 唤醒“当前对象上的等待线程”
33 }
34 }
35 }
36 }
示例2
1 import java.util.concurrent.locks.LockSupport;
2
3 public class LockSupportTest1 {
4
5 private static Thread mainThread;
6
7 public static void main(String[] args) {
8
9 ThreadA ta = new ThreadA("ta");
10 // 获取主线程
11 mainThread = Thread.currentThread();
12
13 System.out.println(Thread.currentThread().getName()+" start ta");
14 ta.start();
15
16 System.out.println(Thread.currentThread().getName()+" block");
17 // 主线程阻塞
18 LockSupport.park(mainThread);
19
20 System.out.println(Thread.currentThread().getName()+" continue");
21 }
22
23 static class ThreadA extends Thread{
24
25 public ThreadA(String name) {
26 super(name);
27 }
28
29 public void run() {
30 System.out.println(Thread.currentThread().getName()+" wakup others");
31 // 唤醒“主线程”
32 LockSupport.unpark(mainThread);
33 }
34 }
35 }
运行结果:
main start ta
main block
ta wakup others
main continue
说明:park和wait的区别。wait让线程阻塞前,必须通过synchronized获取同步锁。
Java多线程系列--“JUC锁”08之 共享锁和ReentrantReadWriteLock
概要
Java的JUC(java.util.concurrent)包中的锁包括"独占锁"和"共享锁"。在“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock ”中,对Java的独占锁进行了说明。本章对Java的“共享锁”进行介绍,JUC中的共享锁有CountDownLatch, CyclicBarrier, Semaphore, ReentrantReadWriteLock等;本章会以ReentrantReadWriteLock为蓝本对共享锁进行说明。内容包括:
ReadWriteLock 和 ReentrantReadWriteLock介绍
ReadWriteLock 和 ReentrantReadWriteLock函数列表
ReentrantReadWriteLock数据结构
参考代码(基于JDK1.7.0_40)
获取共享锁
释放共享锁
公平共享锁和非公平共享锁
ReentrantReadWriteLock示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3505809.html
ReadWriteLock 和 ReentrantReadWriteLock介绍
ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。
“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。
“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。
注意:不能同时存在读取锁和写入锁!
ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。
ReadWriteLock 和 ReentrantReadWriteLock函数列表
ReadWriteLock函数列表
// 返回用于读取操作的锁。
Lock readLock()
// 返回用于写入操作的锁。
Lock writeLock()
// 创建一个新的 ReentrantReadWriteLock,默认是采用“非公平策略”。
ReentrantReadWriteLock()
// 创建一个新的 ReentrantReadWriteLock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。
ReentrantReadWriteLock(boolean fair)
// 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。
protected Thread getOwner()
// 返回一个 collection,它包含可能正在等待获取读取锁的线程。
protected Collection<Thread> getQueuedReaderThreads()
// 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。
protected Collection<Thread> getQueuedThreads()
// 返回一个 collection,它包含可能正在等待获取写入锁的线程。
protected Collection<Thread> getQueuedWriterThreads()
// 返回等待获取读取或写入锁的线程估计数目。
int getQueueLength()
// 查询当前线程在此锁上保持的重入读取锁数量。
int getReadHoldCount()
// 查询为此锁保持的读取锁数量。
int getReadLockCount()
// 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。
protected Collection<Thread> getWaitingThreads(Condition condition)
// 返回正等待与写入锁相关的给定条件的线程估计数目。
int getWaitQueueLength(Condition condition)
// 查询当前线程在此锁上保持的重入写入锁数量。
int getWriteHoldCount()
// 查询是否给定线程正在等待获取读取或写入锁。
boolean hasQueuedThread(Thread thread)
// 查询是否所有的线程正在等待获取读取或写入锁。
boolean hasQueuedThreads()
// 查询是否有些线程正在等待与写入锁有关的给定条件。
boolean hasWaiters(Condition condition)
// 如果此锁将公平性设置为 ture,则返回 true。
boolean isFair()
// 查询是否某个线程保持了写入锁。
boolean isWriteLocked()
// 查询当前线程是否保持了写入锁。
boolean isWriteLockedByCurrentThread()
// 返回用于读取操作的锁。
ReentrantReadWriteLock.ReadLock readLock()
// 返回用于写入操作的锁。
ReentrantReadWriteLock.WriteLock writeLock()
ReentrantReadWriteLock数据结构
ReentrantReadWriteLock的UML类图如下:
从中可以看出:
(01) ReentrantReadWriteLock实现了ReadWriteLock接口。ReadWriteLock是一个读写锁的接口,提供了"获取读锁的readLock()函数" 和 "获取写锁的writeLock()函数"。
(02) ReentrantReadWriteLock中包含:sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了"Sync对象",它们的Sync对象和ReentrantReadWriteLock的Sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。
(03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"FairSync和"非公平锁"NonfairSync。sync对象是"FairSync"和"NonfairSync"中的一个,默认是"NonfairSync"。
参考代码(基于JDK1.7.0_40)
ReentrantReadWriteLock的完整源码
View Code
AQS的完整源码
View Code
其中,共享锁源码相关的代码如下:
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
// ReentrantReadWriteLock的AQS对象
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取“共享锁”
public void lock() {
sync.acquireShared(1);
}
// 如果线程是中断状态,则抛出一场,否则尝试获取共享锁。
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 尝试获取“共享锁”
public boolean tryLock() {
return sync.tryReadLock();
}
// 在指定时间内,尝试获取“共享锁”
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放“共享锁”
public void unlock() {
sync.releaseShared(1);
}
// 新建条件
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
说明:
ReadLock中的sync是一个Sync对象,Sync继承于AQS类,即Sync就是一个锁。ReentrantReadWriteLock中也有一个Sync对象,而且ReadLock中的sync和ReentrantReadWriteLock中的sync是对应关系。即ReentrantReadWriteLock和ReadLock共享同一个AQS对象,共享同一把锁。
ReentrantReadWriteLock中Sync的定义如下:
final Sync sync;
下面,分别从“获取共享锁”和“释放共享锁”两个方面对共享锁进行说明。
获取共享锁
获取共享锁的思想(即lock函数的步骤),是先通过tryAcquireShared()尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过doAcquireShared()不断的循环并尝试获取锁,若有需要,则阻塞等待。doAcquireShared()在循环中每次尝试获取锁时,都是通过tryAcquireShared()来进行尝试的。下面看看“获取共享锁”的详细流程。
1. lock()
lock()在ReadLock中,源码如下:
public void lock() {
sync.acquireShared(1);
}
2. acquireShared()
Sync继承于AQS,acquireShared()定义在AQS中。源码如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
说明:acquireShared()首先会通过tryAcquireShared()来尝试获取锁。
尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。
尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回。
3. tryAcquireShared()
tryAcquireShared()定义在ReentrantReadWriteLock.java的Sync中,源码如下:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取“锁”的状态
int c = getState();
// 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取“读取锁”的共享计数
int r = sharedCount(c);
// 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于MAX_COUNT;
// 则通过CAS函数更新“锁的状态”,将“读取锁”的共享计数+1。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 第1次获取“读取锁”。
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// HoldCounter是用来统计该线程获取“读取锁”的次数。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 将该线程获取“读取锁”的次数+1。
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
说明:tryAcquireShared()的作用是尝试获取“共享锁”。
如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。
否则,通过fullTryAcquireShared()获取读取锁。
4. fullTryAcquireShared()
fullTryAcquireShared()在ReentrantReadWriteLock中定义,源码如下:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
// 获取“锁”的状态
int c = getState();
// 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 如果“需要阻塞等待”。
// (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。
// (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。
} else if (readerShouldBlock()) {
// 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果当前线程获取锁的计数=0,则返回-1。
if (rh.count == 0)
return -1;
}
}
// 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数;
// 如果共享统计数超过MAX_COUNT,则抛出异常。
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 将线程获取“读取锁”的次数+1。
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount。
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,
// 则将firstReaderHoldCount+1。
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 更新线程的获取“读取锁”的共享计数
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。
5. doAcquireShared()
doAcquireShared()定义在AQS函数中,源码如下:
private void doAcquireShared(int arg) {
// addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取“node”的前一节点
final Node p = node.predecessor();
// 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
说明:doAcquireShared()的作用是获取共享锁。
它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。
如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。
doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!
shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等函数已经在“Java多线程系列--“JUC锁”03之 公平锁(一) ”中详细介绍过,这里就不再重复说明了。
释放共享锁
释放共享锁的思想,是先通过tryReleaseShared()尝试释放共享锁。尝试成功的话,则通过doReleaseShared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。
1. unlock()
public void unlock() {
sync.releaseShared(1);
}
说明:该函数实际上调用releaseShared(1)释放共享锁。
2. releaseShared()
releaseShared()在AQS中实现,源码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。
它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
3. tryReleaseShared()
tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下:
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程,即释放共享锁的线程。
Thread current = Thread.currentThread();
// 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,
// 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstReader为null;
// 否则,将“第1个获取锁的线程的获取次数”-1。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
// 获取rh对象,并更新“当前线程获取锁的信息”。
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 获取锁的状态
int c = getState();
// 将锁的获取次数-1。
int nextc = c - SHARED_UNIT;
// 通过CAS更新锁的状态。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
说明:tryReleaseShared()的作用是尝试释放共享锁。
4. doReleaseShared()
doReleaseShared()定义在AQS中,源码如下:
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}
说明:doReleaseShared()会释放“共享锁”。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。
公平共享锁和非公平共享锁
和互斥锁ReentrantLock一样,ReadLock也分为公平锁和非公平锁。
公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()是不同的。
公平锁的readerShouldBlock()的源码如下:
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
非公平锁的readerShouldBlock()的源码如下:
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。
ReentrantReadWriteLock示例
1 import java.util.concurrent.locks.ReadWriteLock;
2 import java.util.concurrent.locks.ReentrantReadWriteLock;
3
4 public class ReadWriteLockTest1 {
5
6 public static void main(String[] args) {
7 // 创建账户
8 MyCount myCount = new MyCount("4238920615242830", 10000);
9 // 创建用户,并指定账户
10 User user = new User("Tommy", myCount);
11
12 // 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程
13 for (int i=0; i<3; i++) {
14 user.getCash();
15 user.setCash((i+1)*1000);
16 }
17 }
18 }
19
20 class User {
21 private String name; //用户名
22 private MyCount myCount; //所要操作的账户
23 private ReadWriteLock myLock; //执行操作所需的锁对象
24
25 User(String name, MyCount myCount) {
26 this.name = name;
27 this.myCount = myCount;
28 this.myLock = new ReentrantReadWriteLock();
29 }
30
31 public void getCash() {
32 new Thread() {
33 public void run() {
34 myLock.readLock().lock();
35 try {
36 System.out.println(Thread.currentThread().getName() +" getCash start");
37 myCount.getCash();
38 Thread.sleep(1);
39 System.out.println(Thread.currentThread().getName() +" getCash end");
40 } catch (InterruptedException e) {
41 } finally {
42 myLock.readLock().unlock();
43 }
44 }
45 }.start();
46 }
47
48 public void setCash(final int cash) {
49 new Thread() {
50 public void run() {
51 myLock.writeLock().lock();
52 try {
53 System.out.println(Thread.currentThread().getName() +" setCash start");
54 myCount.setCash(cash);
55 Thread.sleep(1);
56 System.out.println(Thread.currentThread().getName() +" setCash end");
57 } catch (InterruptedException e) {
58 } finally {
59 myLock.writeLock().unlock();
60 }
61 }
62 }.start();
63 }
64 }
65
66 class MyCount {
67 private String id; //账号
68 private int cash; //账户余额
69
70 MyCount(String id, int cash) {
71 this.id = id;
72 this.cash = cash;
73 }
74
75 public String getId() {
76 return id;
77 }
78
79 public void setId(String id) {
80 this.id = id;
81 }
82
83 public int getCash() {
84 System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash);
85 return cash;
86 }
87
88 public void setCash(int cash) {
89 System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash);
90 this.cash = cash;
91 }
92 }
运行结果:
Thread-0 getCash start
Thread-2 getCash start
Thread-0 getCash cash=10000
Thread-2 getCash cash=10000
Thread-0 getCash end
Thread-2 getCash end
Thread-1 setCash start
Thread-1 setCash cash=1000
Thread-1 setCash end
Thread-3 setCash start
Thread-3 setCash cash=2000
Thread-3 setCash end
Thread-4 getCash start
Thread-4 getCash cash=2000
Thread-4 getCash end
Thread-5 setCash start
Thread-5 setCash cash=3000
Thread-5 setCash end
结果说明:
(01) 观察Thread0和Thread-2的运行结果,我们发现,Thread-0启动并获取到“读取锁”,在它还没运行完毕的时候,Thread-2也启动了并且也成功获取到“读取锁”。
因此,“读取锁”支持被多个线程同时获取。
(02) 观察Thread-1,Thread-3,Thread-5这三个“写入锁”的线程。只要“写入锁”被某线程获取,则该线程运行完毕了,才释放该锁。
因此,“写入锁”不支持被多个线程同时获取。
Java多线程系列--“JUC锁”09之 CountDownLatch原理和示例
概要
前面对"独占锁"和"共享锁"有了个大致的了解;本章,我们对CountDownLatch进行学习。和ReadWriteLock.ReadLock一样,CountDownLatch的本质也是一个"共享锁"。本章的内容包括:
CountDownLatch简介
CountDownLatch数据结构
CountDownLatch源码分析(基于JDK1.7.0_40)
CountDownLatch示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533887.html
CountDownLatch简介
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch和CyclicBarrier的区别
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
关于CyclicBarrier的原理,后面一章再来学习。
CountDownLatch函数列表
CountDownLatch(int count)
构造一个用给定计数初始化的 CountDownLatch。
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
void await()
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
boolean await(long timeout, TimeUnit unit)
// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()
// 返回标识此锁存器及其状态的字符串。
String toString()
CountDownLatch数据结构
CountDownLatch的UML类图如下:
CountDownLatch的数据结构很简单,它是通过"共享锁"实现的。它包含了sync对象,sync是Sync类型。Sync是实例类,它继承于AQS。
CountDownLatch源码分析(基于JDK1.7.0_40)
CountDownLatch完整源码(基于JDK1.7.0_40)
View CodeCountDownLatch是通过“共享锁”实现的。下面,我们分析CountDownLatch中3个核心函数: CountDownLatch(int count), await(), countDown()。
1. CountDownLatch(int count)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
说明:该函数是创建一个Sync对象,而Sync是继承于AQS类。Sync构造函数如下:
Sync(int count) {
setState(count);
}
setState()在AQS中实现,源码如下:
protected final void setState(long newState) {
state = newState;
}
说明:在AQS中,state是一个private volatile long类型的对象。对于CountDownLatch而言,state表示的”锁计数器“。CountDownLatch中的getCount()最终是调用AQS中的getState(),返回的state对象,即”锁计数器“。
2. await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
说明:该函数实际上是调用的AQS的acquireSharedInterruptibly(1);
AQS中的acquireSharedInterruptibly()的源码如下:
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
说明:acquireSharedInterruptibly()的作用是获取共享锁。
如果当前线程是中断状态,则抛出异常InterruptedException。否则,调用tryAcquireShared(arg)尝试获取共享锁;尝试成功则返回,否则就调用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。
tryAcquireShared()在CountDownLatch.java中被重写,它的源码如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
说明:tryAcquireShared()的作用是尝试获取共享锁。
如果"锁计数器=0",即锁是可获取状态,则返回1;否则,锁是不可获取状态,则返回-1。
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
说明:
(01) addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。关于Node和CLH在"Java多线程系列--“JUC锁”03之 公平锁(一)"已经详细介绍过,这里就不再重复说明了。
(02) node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
(03) shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
(04) 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函数在"Java多线程系列--“JUC锁”03之 公平锁(一)"中介绍过,这里也就不再详细说明了。
3. countDown()
public void countDown() {
sync.releaseShared(1);
}
说明:该函数实际上调用releaseShared(1)释放共享锁。
releaseShared()在AQS中实现,源码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。
它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
tryReleaseShared()在CountDownLatch.java中被重写,源码如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
// 获取“锁计数器”的状态
int c = getState();
if (c == 0)
return false;
// “锁计数器”-1
int nextc = c-1;
// 通过CAS函数进行赋值。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
说明:tryReleaseShared()的作用是释放共享锁,将“锁计数器”的值-1。
总结:CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count给线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!
以上,就是CountDownLatch的实现原理。
CountDownLatch的使用示例
下面通过CountDownLatch实现:"主线程"等待"5个子线程"全部都完成"指定的工作(休眠1000ms)"之后,再继续运行。
1 import java.util.concurrent.CountDownLatch;
2 import java.util.concurrent.CyclicBarrier;
3
4 public class CountDownLatchTest1 {
5
6 private static int LATCH_SIZE = 5;
7 private static CountDownLatch doneSignal;
8 public static void main(String[] args) {
9
10 try {
11 doneSignal = new CountDownLatch(LATCH_SIZE);
12
13 // 新建5个任务
14 for(int i=0; i<LATCH_SIZE; i++)
15 new InnerThread().start();
16
17 System.out.println("main await begin.");
18 // "主线程"等待线程池中5个任务的完成
19 doneSignal.await();
20
21 System.out.println("main await finished.");
22 } catch (InterruptedException e) {
23 e.printStackTrace();
24 }
25 }
26
27 static class InnerThread extends Thread{
28 public void run() {
29 try {
30 Thread.sleep(1000);
31 System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
32 // 将CountDownLatch的数值减1
33 doneSignal.countDown();
34 } catch (InterruptedException e) {
35 e.printStackTrace();
36 }
37 }
38 }
39 }
运行结果:
main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.
结果说明:主线程通过doneSignal.await()等待其它线程将doneSignal递减至0。其它的5个InnerThread线程,每一个都通过doneSignal.countDown()将doneSignal的值减1;当doneSignal为0时,main被唤醒后继续执行。
Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例
概要
本章介绍JUC包中的CyclicBarrier锁。内容包括:
CyclicBarrier简介
CyclicBarrier数据结构
CyclicBarrier源码分析(基于JDK1.7.0_40)
CyclicBarrier示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533995.html
CyclicBarrier简介
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
注意比较CountDownLatch和CyclicBarrier:
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
CyclicBarrier函数列表
CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。
CyclicBarrier数据结构
CyclicBarrier的UML类图如下:
CyclicBarrier是包含了"ReentrantLock对象lock"和"Condition对象trip",它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。
CyclicBarrier源码分析(基于JDK1.7.0_40)
CyclicBarrier完整源码(基于JDK1.7.0_40)
View CodeCyclicBarrier是通过ReentrantLock(独占锁)和Condition来实现的。下面,我们分析CyclicBarrier中3个核心函数: 构造函数, await()作出分析。
1. 构造函数
CyclicBarrier的构造函数共2个:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,下面第2个构造函数的源码。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// parties表示“必须同时到达barrier的线程个数”。
this.parties = parties;
// count表示“处在等待状态的线程个数”。
this.count = parties;
// barrierCommand表示“parties个线程到达barrier时,会执行的动作”。
this.barrierCommand = barrierAction;
}
2. 等待函数
CyclicBarrier.java中await()方法如下:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
说明:await()是通过dowait()实现的。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取“独占锁(lock)”
lock.lock();
try {
// 保存“当前的generation”
final Generation g = generation;
// 若“当前generation已损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 将“count计数器”-1
int index = --count;
// 如果index=0,则意味着“有parties个线程到达barrier”。
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果barrierCommand不为null,则执行该动作。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,并更新generation。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,
// 当前线程才继续执行。
for (;;) {
try {
// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等待过程中,线程被中断,则执行下面的函数。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 如果“当前generation已经损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果“generation已经换代”,则返回index。
if (g != generation)
return index;
// 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放“独占锁(lock)”
lock.unlock();
}
}
说明:dowait()的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation是CyclicBarrier的一个成员遍历,它的定义如下:
private Generation generation = new Generation();
private static class Generation {
boolean broken = false;
}
在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。
当有parties个线程到达barrier,generation就会被更新换代。
(02) 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。breakBarrier()的源码如下:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。
(03) 将“count计数器”-1,即--count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作,nextGeneration()的源码如下:
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。
(04) 在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。
CyclicBarrier的使用示例
示例1
新建5个线程,这5个线程达到一定的条件时,它们才继续往后运行。
1 import java.util.concurrent.CyclicBarrier;
2 import java.util.concurrent.BrokenBarrierException;
3
4 public class CyclicBarrierTest1 {
5
6 private static int SIZE = 5;
7 private static CyclicBarrier cb;
8 public static void main(String[] args) {
9
10 cb = new CyclicBarrier(SIZE);
11
12 // 新建5个任务
13 for(int i=0; i<SIZE; i++)
14 new InnerThread().start();
15 }
16
17 static class InnerThread extends Thread{
18 public void run() {
19 try {
20 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21
22 // 将cb的参与者数量加1
23 cb.await();
24
25 // cb的参与者数量等于5时,才继续往后执行
26 System.out.println(Thread.currentThread().getName() + " continued.");
27 } catch (BrokenBarrierException e) {
28 e.printStackTrace();
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32 }
33 }
34 }
运行结果:
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.
结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,这些线程才继续运行!
示例2
新建5个线程,当这5个线程达到一定的条件时,执行某项任务。
1 import java.util.concurrent.CyclicBarrier;
2 import java.util.concurrent.BrokenBarrierException;
3
4 public class CyclicBarrierTest2 {
5
6 private static int SIZE = 5;
7 private static CyclicBarrier cb;
8 public static void main(String[] args) {
9
10 cb = new CyclicBarrier(SIZE, new Runnable () {
11 public void run() {
12 System.out.println("CyclicBarrier's parties is: "+ cb.getParties());
13 }
14 });
15
16 // 新建5个任务
17 for(int i=0; i<SIZE; i++)
18 new InnerThread().start();
19 }
20
21 static class InnerThread extends Thread{
22 public void run() {
23 try {
24 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25
26 // 将cb的参与者数量加1
27 cb.await();
28
29 // cb的参与者数量等于5时,才继续往后执行
30 System.out.println(Thread.currentThread().getName() + " continued.");
31 } catch (BrokenBarrierException e) {
32 e.printStackTrace();
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 }
36 }
37 }
38 }
运行结果:
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.
结果说明:主线程中新建了5个线程,所有的这些线程都调用cb.await()等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,执行新建cb时注册的Runnable任务。
概要
本章,我们对JUC包中的信号量Semaphore进行学习。内容包括:
Semaphore简介
Semaphore数据结构
Semaphore源码分析(基于JDK1.7.0_40)
Semaphore示例
转载请注明出处:http://www.cnblogs.com/skywang12345/p/3534050.html
Semaphore简介
Semaphore是一个计数信号量,它的本质是一个"共享锁"。
信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
Semaphore的函数列表
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)
// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)
Semaphore数据结构
Semaphore的UML类图如下:
从图中可以看出:
(01) 和"ReentrantLock"一样,Semaphore也包含了sync对象,sync是Sync类型;而且,Sync是一个继承于AQS的抽象类。
(02) Sync包括两个子类:"公平信号量"FairSync 和 "非公平信号量"NonfairSync。sync是"FairSync的实例",或者"NonfairSync的实例";默认情况下,sync是NonfairSync(即,默认是非公平信号量)。
Semaphore源码分析(基于JDK1.7.0_40)
Semaphore完整源码(基于JDK1.7.0_40)
View CodeSemaphore是通过共享锁实现的。根据共享锁的获取原则,Semaphore分为"公平信号量"和"非公平信号量"。
"公平信号量"和"非公平信号量"的区别
"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。
"公平信号量"类
View Code"非公平信号量"类
View Code下面,我们逐步的对它们的源码进行分析。
1. 信号量构造函数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
从中,我们可以信号量分为“公平信号量(FairSync)”和“非公平信号量(NonfairSync)”。Semaphore(int permits)函数会默认创建“非公平信号量”。
2. 公平信号量获取和释放
2.1 公平信号量的获取
Semaphore中的公平信号量是FairSync。它的获取API如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
信号量中的acquire()获取函数,实际上是调用的AQS中的acquireSharedInterruptibly()。
acquireSharedInterruptibly()的源码如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程是中断状态,则抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
// 否则,尝试获取“共享锁”;获取成功则直接返回,获取失败,则通过doAcquireSharedInterruptibly()获取。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
Semaphore中”公平锁“对应的tryAcquireShared()实现如下:
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断“当前线程”是不是CLH队列中的第一个线程线程,
// 若是的话,则返回-1。
if (hasQueuedPredecessors())
return -1;
// 设置“可以获得的信号量的许可数”
int available = getState();
// 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires;
// 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
说明:tryAcquireShared()的作用是尝试获取acquires个信号量许可数。
对于Semaphore而言,state表示的是“当前可获得的信号量许可数”。
下面看看AQS中doAcquireSharedInterruptibly()的实现:
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建”当前线程“的Node节点,且Node中记录的锁是”共享锁“类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
说明:doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。
(01) addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。关于Node和CLH在"Java多线程系列--“JUC锁”03之 公平锁(一)"已经详细介绍过,这里就不再重复说明了。
(02) node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
(03) shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
(04) 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函数在"Java多线程系列--“JUC锁”03之 公平锁(一)"中介绍过,这里也就不再详细说明了。
2.2 公平信号量的释放
Semaphore中公平信号量(FairSync)的释放API如下:
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
信号量的releases()释放函数,实际上是调用的AQS中的releaseShared()。
releaseShared()在AQS中实现,源码如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。
它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
Semaphore重写了tryReleaseShared(),它的源码如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取“可以获得的信号量的许可数”
int current = getState();
// 获取“释放releases个信号量许可之后,剩余的信号量许可数”
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 设置“可以获得的信号量的许可数”为next。
if (compareAndSetState(current, next))
return true;
}
}
如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}
说明:doReleaseShared()会释放“共享锁”。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。
3 非公平信号量获取和释放
Semaphore中的非公平信号量是NonFairSync。在Semaphore中,“非公平信号量许可的释放(release)”与“公平信号量许可的释放(release)”是一样的。
不同的是它们获取“信号量许可”的机制不同,下面是非公平信号量获取信号量许可的代码。
非公平信号量的tryAcquireShared()实现如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
nonfairTryAcquireShared()的实现如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 设置“可以获得的信号量的许可数”
int available = getState();
// 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires;
// 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
说明:非公平信号量的tryAcquireShared()调用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循环中,它都会直接判断“当前剩余的信号量许可数”是否足够;足够的话,则直接“设置可以获得的信号量许可数”,进而再获取信号量。
而公平信号量的tryAcquireShared()中,在获取信号量之前会通过if (hasQueuedPredecessors())来判断“当前线程是不是在CLH队列的头部”,是的话,则返回-1。
Semaphore示例
1 import java.util.concurrent.ExecutorService;
2 import java.util.concurrent.Executors;
3 import java.util.concurrent.Semaphore;
4
5 public class SemaphoreTest1 {
6 private static final int SEM_MAX = 10;
7 public static void main(String[] args) {
8 Semaphore sem = new Semaphore(SEM_MAX);
9 //创建线程池
10 ExecutorService threadPool = Executors.newFixedThreadPool(3);
11 //在线程池中执行任务
12 threadPool.execute(new MyThread(sem, 5));
13 threadPool.execute(new MyThread(sem, 4));
14 threadPool.execute(new MyThread(sem, 7));
15 //关闭池
16 threadPool.shutdown();
17 }
18 }
19
20 class MyThread extends Thread {
21 private volatile Semaphore sem; // 信号量
22 private int count; // 申请信号量的大小
23
24 MyThread(Semaphore sem, int count) {
25 this.sem = sem;
26 this.count = count;
27 }
28
29 public void run() {
30 try {
31 // 从信号量中获取count个许可
32 sem.acquire(count);
33
34 Thread.sleep(2000);
35 System.out.println(Thread.currentThread().getName() + " acquire count="+count);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 } finally {
39 // 释放给定数目的许可,将其返回到信号量。
40 sem.release(count);
41 System.out.println(Thread.currentThread().getName() + " release " + count + "");
42 }
43 }
44 }
(某一次)运行结果:
pool-1-thread-1 acquire count=5
pool-1-thread-2 acquire count=4
pool-1-thread-1 release 5
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=7
pool-1-thread-3 release 7
结果说明:信号量sem的许可总数是10个;共3个线程,分别需要获取的信号量许可数是5,4,7。前面两个线程获取到信号量的许可后,sem中剩余的可用的许可数是1;因此,最后一个线程必须等前两个线程释放了它们所持有的信号量许可之后,才能获取到7个信号量许可。