若有不正之处请多多谅解,并欢迎批评指正。
请尊重作者劳动成果,转载请标明原文链接:
http://www.cnblogs.com/go2sea/p/5615531.html
CyclicBarrier是java.util.concurrent包中提供的同步工具。通过这个工具我们可以实现n个线程相互等待。我们可以通过参数指定达到公共屏障点之后的行为。
先上源码:
1 package java.util.concurrent;
2 import java.util.concurrent.locks.*;
3
4 public class CyclicBarrier {
5
6 private static class Generation {
7 boolean broken = false;
8 }
9
10 private final ReentrantLock lock = new ReentrantLock();
11 private final Condition trip = lock.newCondition();
12 private final int parties;
13 private final Runnable barrierCommand;
14 private Generation generation = new Generation();
15 private int count;
16
17 private void nextGeneration() {
18 // signal completion of last generation
19 trip.signalAll();
20 // set up next generation
21 count = parties;
22 generation = new Generation();
23 }
24
25
26 private void breakBarrier() {
27 generation.broken = true;
28 count = parties;
29 trip.signalAll();
30 }
31
32 private int dowait(boolean timed, long nanos)
33 throws InterruptedException, BrokenBarrierException, TimeoutException {
34 final ReentrantLock lock = this.lock;
35 lock.lock();
36 try {
37 final Generation g = generation;
38
39 //小概率事件:该线程在等待锁的过程中,barrier被破坏
40 if (g.broken)
41 throw new BrokenBarrierException();
42
43 //小概率事件:该线程在等待锁的过程中被中断
44 if (Thread.interrupted()) {
45 breakBarrier();
46 throw new InterruptedException();
47 }
48
49 int index = --count;
50 //当有parties个线程到达barrier
51 if (index == 0) { // tripped
52 boolean ranAction = false;
53 try {
54 final Runnable command = barrierCommand;
55 //如果设置了barrierCommand,令最后到达的barrier的线程执行它
56 if (command != null)
57 command.run();
58 ranAction = true;
59 nextGeneration();
60 return 0;
61 } finally {
62 //注意:当执行barrierCommand出现异常时,ranAction派上用场
63 if (!ranAction)
64 breakBarrier();
65 }
66 }
67
68 // loop until tripped, broken, interrupted, or timed out
69 for (;;) {
70 try {
71 if (!timed)
72 trip.await();
73 else if (nanos > 0L)
74 //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier
75 nanos = trip.awaitNanos(nanos);
76 } catch (InterruptedException ie) {
77 if (g == generation && ! g.broken) {
78 breakBarrier();
79 throw ie;
80 } else {
81 //小概率事件:该线程被中断,进入锁等待队列
82 //在等待过程中,另一个线程更新或破坏了generation
83 //当该线程获取锁之后,应重置interrupt标志而不是抛出异常
84 //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去,
85 //两种情况:
86 //①g被破坏。已经有一个线程抛出了InterruptedException(也只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。
87 //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理
88 Thread.currentThread().interrupt();
89 }
90 }
91
92 //barrier被破坏,抛出异常
93 if (g.broken)
94 throw new BrokenBarrierException();
95
96 //barrier正常进入下一循环,上一代await的线程继续执行
97 if (g != generation)
98 return index;
99
100 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常
101 if (timed && nanos <= 0L) {
102 breakBarrier();
103 throw new TimeoutException();
104 }
105 }
106 } finally {
107 lock.unlock();
108 }
109 }
110
111
112 public CyclicBarrier(int parties, Runnable barrierAction) {
113 if (parties <= 0) throw new IllegalArgumentException();
114 this.parties = parties;
115 this.count = parties;
116 this.barrierCommand = barrierAction;
117 }
118
119 public CyclicBarrier(int parties) {
120 this(parties, null);
121 }
122
123
124 public int getParties() {
125 return parties;
126 }
127
128
129 public int await() throws InterruptedException, BrokenBarrierException {
130 try {
131 return dowait(false, 0L);
132 } catch (TimeoutException toe) {
133 throw new Error(toe); // cannot happen;
134 }
135 }
136
137
138 public int await(long timeout, TimeUnit unit)
139 throws InterruptedException,
140 BrokenBarrierException,
141 TimeoutException {
142 return dowait(true, unit.toNanos(timeout));
143 }
144
145
146 public boolean isBroken() {
147 final ReentrantLock lock = this.lock;
148 lock.lock();
149 try {
150 return generation.broken;
151 } finally {
152 lock.unlock();
153 }
154 }
155
156 public void reset() {
157 final ReentrantLock lock = this.lock;
158 lock.lock();
159 try {
160 breakBarrier(); // break the current generation
161 nextGeneration(); // start a new generation
162 } finally {
163 lock.unlock();
164 }
165 }
166
167 public int getNumberWaiting() {
168 final ReentrantLock lock = this.lock;
169 lock.lock();
170 try {
171 return parties - count;
172 } finally {
173 lock.unlock();
174 }
175 }
176 }
我们先来看一下CyclicBarrier的成员变量:
1 private final ReentrantLock lock = new ReentrantLock();
2 private final Condition trip = lock.newCondition();
3 private final int parties;
4 private final Runnable barrierCommand;
5 private Generation generation = new Generation();
6 private int count;
CyclicBarrier是通过独占锁lock和Condition对象trip来实现的,成员parties表示必须有parties个线程到达barrier,成员barrierCommand表示当parties个线程到达之后要执行的代码,成员count表示离触发barrierCommand还差count个线程(还有count个线程未到达barrier),成员generation表示当前的“代数”,“cyclic”表示可循环使用,generation是对一次循环的标识。注意:Generation是CyclicBarrier的一个私有内部类,他只有一个成员变量来标识当前的barrier是否已“损坏”:
1 private static class Generation {
2 boolean broken = false;
3 }
构造函数
1 public CyclicBarrier(int parties, Runnable barrierAction) {
2 if (parties <= 0) throw new IllegalArgumentException();
3 this.parties = parties;
4 this.count = parties;
5 this.barrierCommand = barrierAction;
6 }
7
8 public CyclicBarrier(int parties) {
9 this(parties, null);
10 }
CyclicBarrier提供了两种构造函数,没有指定barrierCommand的构造函数是调用第二个构造函数实现的。第二个构造函数有两个参数:parties和barrierAction,分别用来初始化成员parties和barrierCommand。注意,parties必须大于0,否则会抛出IllegalArgumentException。
await()方法
1 public int await() throws InterruptedException, BrokenBarrierException {
2 try {
3 return dowait(false, 0L);
4 } catch (TimeoutException toe) {
5 throw new Error(toe); // cannot happen;
6 }
7 }
await方法是由调用dowait方法实现的,两个参数分别代表是否定时等待和等待的时长。
doawait()方法
1 private int dowait(boolean timed, long nanos)
2 throws InterruptedException, BrokenBarrierException, TimeoutException {
3 final ReentrantLock lock = this.lock;
4 lock.lock();
5 try {
6 final Generation g = generation;
7
8 //小概率事件:该线程在等待锁的过程中,barrier被破坏
9 if (g.broken)
10 throw new BrokenBarrierException();
11
12 //小概率事件:该线程在等待锁的过程中被中断
13 if (Thread.interrupted()) {
14 breakBarrier();
15 throw new InterruptedException();
16 }
17
18 int index = --count;
19 //当有parties个线程到达barrier
20 if (index == 0) { // tripped
21 boolean ranAction = false;
22 try {
23 final Runnable command = barrierCommand;
24 //如果设置了barrierCommand,令最后到达的barrier的线程执行它
25 if (command != null)
26 command.run();
27 ranAction = true;
28 nextGeneration();
29 return 0;
30 } finally {
31 //注意:当执行barrierCommand出现异常时,ranAction派上用场
32 if (!ranAction)
33 breakBarrier();
34 }
35 }
36
37 // loop until tripped, broken, interrupted, or timed out
38 for (;;) {
39 try {
40 if (!timed)
41 trip.await();
42 else if (nanos > 0L)
43 //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier
44 nanos = trip.awaitNanos(nanos);
45 } catch (InterruptedException ie) {
46 if (g == generation && ! g.broken) {
47 breakBarrier();
48 throw ie;
49 } else {
50 //小概率事件:该线程被中断,进入锁等待队列
51 //在等待过程中,另一个线程更新或破坏了generation
52 //当该线程获取锁之后,应重置interrupt标志而不是抛出异常
53 //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去,
54 //两种情况:
55 //①g被破坏:已有一个线程抛出InterruptedException(只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。
56 //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理
57 Thread.currentThread().interrupt();
58 }
59 }
60
61 //barrier被破坏,抛出异常
62 if (g.broken)
63 throw new BrokenBarrierException();
64
65 //barrier正常进入下一循环,上一代await的线程继续执行
66 if (g != generation)
67 return index;
68
69 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常
70 if (timed && nanos <= 0L) {
71 breakBarrier();
72 throw new TimeoutException();
73 }
74 }
75 } finally {
76 lock.unlock();
77 }
78 }
dowait方法是CyclicBarrier的精华。应该重点来理解。
方法开头首先申请锁,然后做了两个判断:g.broken和Thread.interrupted(),这两个判断是分别处理两种小概率的事件:①该线程在等待锁的过程中,barrier被破坏②该线程在等待锁的过程中被中断。这两个事件应抛出相应的异常。接下来dowait方法修改了令count减1,如果此时count减为0,说明已经有parties个线程到达barrier,这时由最后到达barrier的线程去执行barrierCommand。注意,这里设置了一个布尔值ranAction,作用是来标识barrierCommand是否被正确执行完毕,如果执行失败,finally中会执行breakBarrier操作。如果count尚未减为0,则在Condition对象trip上执行await操作,注意:这里有一个InterruptedException的catch子句。当前线程在await中被中断时,会抛出InterruptedException,这时候如果g==generation&&!g.broken的话,我们执行breakBarrier操作,同时抛出这个异常;如果g!=generation或者g.broken==true的话,我们的操作是重置interrupt标志而不是抛出这个异常。这么做的原因我们分两种情况讨论:
①g被破坏,这也是一个小概率事件,当前线程被中断后进入锁等待队列,此时另一个线程由于某种原因(超时或者被中断)在他之前获取了锁并执行了breakBarrier方法,那么当前线程持有锁之后就不应再抛InterruptedException,逻辑上应该处理barrier被破坏事件,事实上在后续g.broken的检查中,他会抛出一个BrokenBarrierException。而当前的InterruptedException被我们捕获却没有做出处理,所以执行interrupt方法重置中断标志,交由上层程序处理。
②g被更新:说明当前线程在即将完成等待之际被中断,此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理。
后续对g.broken和g!=generation的判断,分表代表了被唤醒线程(非最后一个到达barrier的线程,也不是被中断或第一个超时的线程)的两种退出方法的方式:第一种是以barrier被破坏告终(然后抛异常),第二个是barrier等到parties个线程,寿终正寝(返回该线程的到达次序index)。
最后一个if是第一个超时线程执行breakBarrier操作并跑出异常。最后finally子句要释放锁。
至此,整个doawait方法流程就分析完毕了,我们可以发现,在barrier上等待的线程,如果以抛异常结束的话,只有第一个线程会抛InterruptedException或TimeoutException并执行breakBarrier操作,其他等待线程只能抛BrokenBarrierException,逻辑上这也是合理的:一个barrier只能因超时或中断被破坏一次。
1 private void nextGeneration() {
2 trip.signalAll();
3 count = parties;
4 generation = new Generation();
5 }
6
7 private void breakBarrier() {
8 generation.broken = true;
9 count = parties;
10 trip.signalAll();
11 }
doawait方法中用到的nextGeneration方法将所有等待线程唤醒,更新generation对象,复位count,进入下一轮任务。breakBarrier方法将generation状态值为broken,复位count(这个复位看上去没有用,但实际上,在broken之后reset之前,如果调用getNumberWaiting方法查看等待线程数的话,复位count是合理的),并唤醒所有等待线程。在调用reset更新generation之前,barrier将处于不可用状态。
reset()方法
1 public void reset() {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 breakBarrier(); // break the current generation
6 nextGeneration(); // start a new generation
7 } finally {
8 lock.unlock();
9 }
10 }
reset方法先break当执行breakBarrier操作(如果有线程在barrier上等待,调用reset会导致BrokenBarrierException),再更新generation对象。