java并发编程实战第三章 同步辅助类CyclicBarrier解析
本实例演示了该类的一个主要用途:在集合点的同步。
4.在集合点的同步(这里的任务是分两个阶段完成的,而且这两个阶段是受到CyclicBarrier控制的)
这里的演示范例是指多个线程将在同一个集合点完成工作,并进入下一个工作。与第三个案例相似,但是更加强大。本案例使用了一个分治编程技术。
使用CyclicBarrier(我称之为线程等待计数器)
说明:
1.CyclicBarrier类使用一个整型数进行初始化,这个数是需要在某个点上同步的线程数。
2.当一个线程到达指定的点后,将调用await()方法等待其他的线程,当线程调用await()方法后,
CyclicBarrier类将并使之休眠等待其他所有线程到达、
3.当最后一个线程调用await()方法后,该CyclicBarrier对象将唤醒所有在等待的线程,然后这些线程将继续执行。
与CountDownLatch的不同之处
1.CyclicBarrier对象可以被重置会初试状态,并把它的内部计数器重置成初试值。
2.CyclicBarrier对象的重置是通过CyclicBarrier类的reset()方法完成的。
当重置发生后,在await方法中等待的线程将收到一个BrokenBarrierException异常,本例中,是将这个异常打印出来,在更复杂的程序中,它可以用来执行其他操作
比如重新执行或者将操作复原回被中断的状态。
损坏的CyclicBarrier对象
该对象有一种特殊的状态就是损坏状态(Broken)。当很多线程在await方法上等待的时候,如果其中一个线程被中断,这个线程将抛出InterruptedException异常,其他等待
的线程将抛出BrokenBarrierException异常,于是该对象就成了损坏的状态了
代码实例:
这里使用分治编程技术将在数字矩阵中寻找一个数字,这个矩阵将被分成几个子集,然后每个线程将在一个子集中查找。一旦所有线程都完成查找,最终的任务将统一这些结果。
1.MatrixMock(矩阵类)
/**
*
* @author fcs
* @date 2015-4-21
* 描述:在集合点的同步
* 说明:使用CyclicBarrier类(线程等待计数器)
* 使用多个线程在一个大的矩阵中查找指定数字出现的次数
*/
public class MatrixMock {
private int data[][];
/**
*
* @param size //矩阵的行数
* @param length //每行的长度
* @param number //要寻找的数字
*/
public MatrixMock(int size,int length,int number){
int counter = 0;
data = new int [size][length];
Random random = new Random();
for(int i = 0;i < size;i++){
for(int j =0 ;j< length;j++){
data[i][j] = random.nextInt(10);
if(data[i][j] == number){
counter++;
}
}
}
System.out.printf("Mock : There are %d ocurrences of number in generated data.\n",counter,number);
}
/**
*
* 作者:fcs
* 描述:返回矩阵中指定行的内容,如果没有就返回null
* 参数:row 行号
* 时间:2015-4-21
*/
public int [] getRow(int row){
if((row>=0) &&(row < data.length)){
return data[row];
}
return null;
}
}
2.创建结果类
/**
*
* @author fcs
* @date 2015-4-21
* 描述:保存矩阵中每行查找到指定数字的次数
* 说明:
*/
public class Result {
private int data[];
//初始化结果数组
public Result(int size) {
data = new int [size];
}
public int[] getData() {
return data;
}
//设置指定数组索引的值,存放矩阵中指定行查找的数字的个数
public void setData(int position,int value) {
data[position] = value;
}
}
3.查找类–实现线程
/**
*
* @author fcs
* @date 2015-5-2
* 描述:CyclicBarrier类会自动维护线程的状态
* 说明:
*/
public class Searcher implements Runnable{
private int firstRow;
private int lastRow;
private MatrixMock mock;
private Result result;
private int number;
private final CyclicBarrier barrier;
public Searcher(int firstRow, int lastRow, MatrixMock mock, Result result,
int number, CyclicBarrier barrier) {
this.firstRow = firstRow;
this.lastRow = lastRow;
this.mock = mock;
this.result = result;
this.number = number;
this.barrier = barrier;
}
/**
* 查找数字,使用内部变量counter来存放每行找到的次数
*/
@Override
public void run() {
int counter ;
System.out.printf("%s: processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);
for(int i = firstRow;i< lastRow;i++){
int row [] = mock.getRow(i);
counter =0 ;
for(int j = 0;j<row.length;j++){
if(row[j] == number){
counter++;
}
}
result.setData(i, counter);
}
System.out.printf("%s Lines processed\n.",Thread.currentThread().getName());
try {
/*每个线程到达集合点后就会调用await()方法通知CyclicBarrier对象,该对象会让这个线程休眠直到其他所有线程都到达集合点*/
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
4.计算在矩阵中查找到的总次数。创建Grouper类
public class Grouper implements Runnable {
private Result result;
public Grouper(Result result) {
super();
this.result = result;
}
/**
* 计算在结果类数组中查找的次数
*/
@Override
public void run() {
int finalResult = 0;
System.out.println("Grouper: Processing results...\n");
int data [] = result.getData();
for(int number:data){
finalResult +=number;
}
System.out.printf("Grouper:Total result : %d.\n",finalResult);
}
}
5.测试类
public class Main {
public static void main(String[] args) {
final int ROWS = 10000; //行数
final int NUMBERS = 1000; //随机数范围
final int SEARCH = 5; //要搜索的数字
final int PARTICIPANT = 5; //搜索线程的数量
final int LINES_PARTICIPANT = 2000; //每个线程搜索的区间
MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
Result result = new Result(ROWS);
Grouper grouper = new Grouper(result);
//创建CyclicBarrier类,这个对象将等待五个线程运行结束,
//然后执行创建的Grouper线程对象
CyclicBarrier barrier = new CyclicBarrier(PARTICIPANT,grouper);
Searcher searcher [] = new Searcher[PARTICIPANT];
for(int i =0;i < PARTICIPANT;i++){
searcher [i] = new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, result, 5, barrier);
Thread thread = new Thread(searcher[i]);
thread.start();
}
System.out.println("Main: The main thread has finished.\n");
}
}
演示效果:
对CyclicBarrier的一些方法进行了解
3.CyclicBarrier:同步辅助类,允许多个线程在某个集合点出相互等待。
所有线程成都将在同一个集合点同步。
主要方法:
1.await():线程调用该方法会通知CyclicBarrier对象,CyclicBarrier对象会将这个线程置入休眠直到其他线程都到达集合点。
线程调用该方法用来等待其他线程到达集合点。
2.await(long time,TimeUnit unit):这个方法被调用后,线程将一直休眠直到被中断,或者CyclicBarrier的内部计数器到达0,或者指定的时间已经过期。
3.reset():重置CyclicBarrier对象使其回到初始状态,并把内部计数器重置成初试化时的值。
4.getParties():获得参与同步线程的数目
5.isBroken(): 获得CyclicBarrier是否处于损坏状态,如果是就返回true,否则返回false
6.getNumberWaiting():返回阻塞或者休眠的线程数目,该方法主要用在debug模式和断言中。
java 1.6API:
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。