栅栏 CyclicBarrier

时间:2021-12-01 12:09:00

java.util.concurrent.CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

 package cyclicbarrier;

 import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier; public class Total {
public static ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<String, Integer>();
public static void main(String[] args) { TotalService totalService = new TotalServiceImpl();
CyclicBarrier barrier = new CyclicBarrier(5,
new TotalTask(totalService)); // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。
new BillTask(new BillServiceImpl(), barrier, "北京").start();
new BillTask(new BillServiceImpl(), barrier, "上海").start();
new BillTask(new BillServiceImpl(), barrier, "广西").start();
new BillTask(new BillServiceImpl(), barrier, "四川").start();
new BillTask(new BillServiceImpl(), barrier, "黑龙江").start(); }
} /**
* 主任务:汇总任务
*/
class TotalTask implements Runnable {
private TotalService totalService; TotalTask(TotalService totalService) {
this.totalService = totalService;
} public void run() {
// 读取内存中各省的数据汇总,过程略。
int totalCount = totalService.count();
Total.result.put("total", totalCount);
System.out.println("=======================================");
System.out.println("开始全国汇总");
System.out.println("全国汇总结果:" + Total.result.get("total"));
}
} /**
* 子任务:计费任务
*/
class BillTask extends Thread {
// 计费服务
private BillService billService;
private CyclicBarrier barrier;
// 代码,按省代码分类,各省数据库独立。
private String code; BillTask(BillService billService, CyclicBarrier barrier, String code) {
this.billService = billService;
this.barrier = barrier;
this.code = code;
} public void run() {
System.out.println("开始计算--" + code + "省--数据!");
int ret = billService.bill(code);
Total.result.put(code, ret);
// 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略
System.out.println("结果:" + ret);
System.out.println(code + "省已经计算完成,并通知汇总Service!");
try {
// 通知barrier已经完成
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
} interface TotalService
{
public int count();
} class TotalServiceImpl implements TotalService
{
@Override
public int count()
{
int totalCount = 0;
for(String key : Total.result.keySet())
{
totalCount += Total.result.get(key);
}
return totalCount;
}
} interface BillService
{
public int bill(String code);
}
class BillServiceImpl implements BillService
{ @Override
public int bill(String code)
{
return 1;
}
}

Total.java

原文  http://blog.chinaunix.net/uid-7374279-id-4658408.html