CyclicBarrier意为可循环使用的(Cyclic)屏障(Barrier),属于jdk 5新增加的并发工具,需要导入java.util.concurrent.CylicBarrier才能使用。CyclicBarrier适用于这样的场景:多线程并发执行,已经执行完的线程需要阻塞等待其他线程执行完毕,最后执行主线程的工作。听起来非常类似CountDownLatch,CyclicBarrier与CountDownLatch的区别主要在于CyclicBarrier是可循环利用的,而CountDownLatch只能使用一次。
下面使用CyclicBarrier实现上一篇文章读取文件的例子,从而演示CyclicBarrier的基本用法:
package com.rhwayfun.concurrency.r0406;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Created by rhwayfun on 16-4-6.
*/
public class CyclicBarrierDemo {
//参数3表示的是屏障拦截的线程数
static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
//日期格式器
static final DateFormat format = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
//第一个读取的线程
Thread thread1 = new Thread(new Runnable() {
public void run() {
long start = System.currentTimeMillis();
for (;;){
if (System.currentTimeMillis() - start > 1000 * 10){
break;
}
}
System.out.println(Thread.currentThread().getName() + " finished task at " + format.format(new Date()));
try {
//调用await方法告诉CyclicBarrier我已经到达了屏障
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
},"Thread-1");
//第二个线程开始读取
Thread thread2 = new Thread(new Runnable() {
public void run() {
long start = System.currentTimeMillis();
for (;;){
if (System.currentTimeMillis() - start > 1000 * 5){
break;
}
}
System.out.println(Thread.currentThread().getName() + " finished task at " + format.format(new Date()));
try {
//表示当前线程已经到达了屏障
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}, "Thread-2");
System.out.println(Thread.currentThread().getName() + " start task at " + format.format(new Date()));
thread1.start();
thread2.start();
//主线程调用await方法表示主线程已经到达了屏障
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " ended task at " + format.format(new Date()));
}
}
运行结果如下:
可以发现与之前使用CountDownLatch的结果是一样的,唯一一点区别是CyclicBarrier的参数的意义不同,之前代码的参数是2,现在是3,因为除了两个子线程还包括主线程,而参数的本义就是屏障拦截的线程数,所以改成3也就情理之中了。另外,如果把3改成4,那么当前两个子线程和主线程都通知CyclicBarrier到达屏障后,由于没有第四个线程到达屏障,所以这三个线程都将阻塞等待,永远不会停止。
除此之外,CyclicBarrier还提供了高级的功能:CyclicBarrier(int parties, Runnable action)
。用于在线程到达屏障后优先执行action。这个构造函数适用于处理更为复杂的业务场景。
现在为了演示这个功能,将之前的需求进行一点修改:需要并发统计每个文件的字符数,所有线程统计完毕后由另外的线程得到总字符数。
演示代码如下:
package com.rhwayfun.concurrency.r0406;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
/**
* Created by rhwayfun on 16-4-6.
*/
public class CyclicBarrierDemo2 implements Runnable{
/**
* 创建4个屏障
* 表示4个线程并发统计文件的字符数
* this:表示4个屏障用完后执行当前线程
*/
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);
/**
* 日期格式器
*/
private DateFormat format = new SimpleDateFormat("HH:mm:ss");
/**
* 适用线程池执行线程
*/
private Executor executor = Executors.newFixedThreadPool(4);
/**
* 保存每个线程执行的结果
*/
private Map<String,Integer> result = new ConcurrentHashMap<String, Integer>();
/**
* 随机数生成器
*/
private Random random = new Random();
/**
* 统计方法
*/
private void count(){
for (int i = 0; i < 4; i++){
executor.execute(new Runnable() {
public void run() {
//计算当前文件的字符数
result.put(Thread.currentThread().getName(),random.nextInt(5));
System.out.println(Thread.currentThread().getName() + " finish task at "+ format.format(new Date()));
//计算完成插入屏障
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public void run() {
int res = 0;
//汇总每个线程的执行结果
for (Map.Entry<String,Integer> entry : result.entrySet()){
res += entry.getValue();
}
//将结果保存到map中
result.put("result",res);
System.out.println("final result:" + res);
}
public static void main(String[] args){
CyclicBarrierDemo2 c = new CyclicBarrierDemo2();
c.count();
}
}
运行结果如下:
之前提到CyclicBarrier的屏障可以多次使用,比如在处理复杂业务场景的时候,可以让线程重新运行一遍。除此之外,CyclicBarrier还提供了其他的方法。比如getNumberWaiting可以获得当前阻塞的线程数。isBroken用来了解阻塞的线程释放被中断。