CyclicBarrier的工作原理及其实例

时间:2021-01-26 22:56:24

CyclicBarrier是多线程中一个重要的类,主要用于线程组内部之间的线程的相互等待问题。

1.CyclicBarrier的工作原理

CyclicBarrier大致是可循环利用的屏障,顾名思义,这个名字也将这个类的特点给明确地表示出来了。首先,便是可重复利用,说明该类创建的对象可以复用;其次,屏障则体现了该类的原理:每个线程执行时,都会碰到一个屏障,直到所有线程执行结束,然后屏障便会打开,使所有线程继续往下执行。

这里介绍CyclicBarrier的两个构造函数:CyclicBarrier(int parties)和CyclicBarrier(int parties, Runnable barrierAction) :前者只需要声明需要拦截的线程数即可,而后者还需要定义一个等待所有线程到达屏障优先执行的Runnable对象。

实现原理:在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用await方法时,将拦截的线程数减1,然后判断剩余拦截数是否为初始值parties,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁。

举例说明:如果一个寝室四个人约好了去球场打球,由于四个人准备工作不同,所以约好在楼下集合,并且四个人集合好之后一起出发去球场。

  1.  
    package concurrent;
  2.  
    import java.util.concurrent.CyclicBarrier;
  3.  
    import java.util.concurrent.*;
  4.  
    import java.util.concurrent.LinkedBlockingQueue;
  5.  
    import java.util.concurrent.ThreadPoolExecutor;
  6.  
    import java.util.concurrent.TimeUnit;
  7.  
    import java.util.*;
  8.  
    public class CyclicBarrierDemo {
  9.  
    private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(4,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
  10.  
    //当拦截线程数达到4时,便优先执行barrierAction,然后再执行被拦截的线程。
  11.  
    private static final CyclicBarrier cb=new CyclicBarrier(4,new Runnable() {
  12.  
    public void run()
  13.  
    {
  14.  
    System.out.println("寝室四兄弟一起出发去球场");
  15.  
    }
  16.  
    });
  17.  
    private static class GoThread extends Thread{
  18.  
    private final String name;
  19.  
    public GoThread(String name)
  20.  
    {
  21.  
    this.name=name;
  22.  
    }
  23.  
    public void run()
  24.  
    {
  25.  
    System.out.println(name+"开始从宿舍出发");
  26.  
    try {
  27.  
    Thread.sleep(1000);
  28.  
    cb.await();//拦截线程
  29.  
    System.out.println(name+"从楼底下出发");
  30.  
    Thread.sleep(1000);
  31.  
    System.out.println(name+"到达操场");
  32.  
     
  33.  
    }
  34.  
    catch(InterruptedException e)
  35.  
    {
  36.  
    e.printStackTrace();
  37.  
    }
  38.  
    catch(BrokenBarrierException e)
  39.  
    {
  40.  
    e.printStackTrace();
  41.  
    }
  42.  
    }
  43.  
    }
  44.  
    public static void main(String[] args) {
  45.  
    // TODO Auto-generated method stub
  46.  
    String[] str= {"李明","王强","刘凯","赵杰"};
  47.  
    for(int i=0;i<4;i++)
  48.  
    {
  49.  
    threadPool.execute(new GoThread(str[i]));
  50.  
    }
  51.  
    try
  52.  
    {
  53.  
    Thread.sleep(4000);
  54.  
    System.out.println("四个人一起到达球场,现在开始打球");
  55.  
    }
  56.  
    catch(InterruptedException e)
  57.  
    {
  58.  
    e.printStackTrace();
  59.  
    }
  60.  
     
  61.  
     
  62.  
    }
  63.  
     
  64.  
    }

运行程序,得到如下结果:

  1.  
    李明开始从宿舍出发
  2.  
    赵杰开始从宿舍出发
  3.  
    王强开始从宿舍出发
  4.  
    刘凯开始从宿舍出发
  5.  
    寝室四兄弟一起出发去球场
  6.  
    赵杰从楼底下出发
  7.  
    李明从楼底下出发
  8.  
    刘凯从楼底下出发
  9.  
    王强从楼底下出发
  10.  
    赵杰到达操场
  11.  
    王强到达操场
  12.  
    李明到达操场
  13.  
    刘凯到达操场
  14.  
    四个人一起到达球场,现在开始打球

以上便是CyclicBarrier使用实例,通过await()方法对线程的拦截,拦截数加1,当拦截数为初始的parties,首先执行了barrierAction,然后对拦截的线程队列依次进行获取锁释放锁。接下来,在这个例子上讲解CyclicBarrier对象的复用特性。

  1.  
    package concurrent;
  2.  
    import java.util.concurrent.CyclicBarrier;
  3.  
    import java.util.concurrent.*;
  4.  
    import java.util.concurrent.LinkedBlockingQueue;
  5.  
    import java.util.concurrent.ThreadPoolExecutor;
  6.  
    import java.util.concurrent.TimeUnit;
  7.  
    import java.util.*;
  8.  
    public class CyclicBarrierDemo {
  9.  
    private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(4,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
  10.  
    //当拦截线程数达到4时,便优先执行barrierAction,然后再执行被拦截的线程。
  11.  
    private static final CyclicBarrier cb=new CyclicBarrier(4,new Runnable() {
  12.  
    public void run()
  13.  
    {
  14.  
    System.out.println("寝室四兄弟一起出发去球场");
  15.  
    }
  16.  
    });
  17.  
    private static class GoThread extends Thread{
  18.  
    private final String name;
  19.  
    public GoThread(String name)
  20.  
    {
  21.  
    this.name=name;
  22.  
    }
  23.  
    public void run()
  24.  
    {
  25.  
    System.out.println(name+"开始从宿舍出发");
  26.  
    try {
  27.  
    Thread.sleep(1000);
  28.  
    cb.await();//拦截线程
  29.  
    System.out.println(name+"从楼底下出发");
  30.  
    Thread.sleep(1000);
  31.  
    System.out.println(name+"到达操场");
  32.  
     
  33.  
    }
  34.  
    catch(InterruptedException e)
  35.  
    {
  36.  
    e.printStackTrace();
  37.  
    }
  38.  
    catch(BrokenBarrierException e)
  39.  
    {
  40.  
    e.printStackTrace();
  41.  
    }
  42.  
    }
  43.  
    }
  44.  
    public static void main(String[] args) {
  45.  
    // TODO Auto-generated method stub
  46.  
    String[] str= {"李明","王强","刘凯","赵杰"};
  47.  
    String[] str1= {"王二","洪光","雷兵","赵三"};
  48.  
    for(int i=0;i<4;i++)
  49.  
    {
  50.  
    threadPool.execute(new GoThread(str[i]));
  51.  
    }
  52.  
    try
  53.  
    {
  54.  
    Thread.sleep(4000);
  55.  
    System.out.println("四个人一起到达球场,现在开始打球");
  56.  
    System.out.println("现在对CyclicBarrier进行复用.....");
  57.  
    System.out.println("又来了一拨人,看看愿不愿意一起打:");
  58.  
    }
  59.  
    catch(InterruptedException e)
  60.  
    {
  61.  
    e.printStackTrace();
  62.  
    }
  63.  
    //进行复用:
  64.  
    for(int i=0;i<4;i++)
  65.  
    {
  66.  
    threadPool.execute(new GoThread(str1[i]));
  67.  
    }
  68.  
    try
  69.  
    {
  70.  
    Thread.sleep(4000);
  71.  
    System.out.println("四个人一起到达球场,表示愿意一起打球,现在八个人开始打球");
  72.  
    //System.out.println("现在对CyclicBarrier进行复用");
  73.  
    }
  74.  
    catch(InterruptedException e)
  75.  
    {
  76.  
    e.printStackTrace();
  77.  
    }
  78.  
     
  79.  
     
  80.  
     
  81.  
    }
  82.  
     
  83.  
    }

运行如下程序,得到:

  1.  
    王强开始从宿舍出发
  2.  
    赵杰开始从宿舍出发
  3.  
    李明开始从宿舍出发
  4.  
    刘凯开始从宿舍出发
  5.  
    寝室四兄弟一起出发去球场
  6.  
    王强从楼底下出发
  7.  
    李明从楼底下出发
  8.  
    刘凯从楼底下出发
  9.  
    赵杰从楼底下出发
  10.  
    王强到达操场
  11.  
    李明到达操场
  12.  
    赵杰到达操场
  13.  
    刘凯到达操场
  14.  
    四个人一起到达球场,现在开始打球
  15.  
    现在对CyclicBarrier进行复用.....
  16.  
    又来了一拨人,看看愿不愿意一起打:
  17.  
    王二开始从宿舍出发
  18.  
    雷兵开始从宿舍出发
  19.  
    洪光开始从宿舍出发
  20.  
    赵三开始从宿舍出发
  21.  
    寝室四兄弟一起出发去球场
  22.  
    洪光从楼底下出发
  23.  
    王二从楼底下出发
  24.  
    雷兵从楼底下出发
  25.  
    赵三从楼底下出发
  26.  
    雷兵到达操场
  27.  
    赵三到达操场
  28.  
    洪光到达操场
  29.  
    王二到达操场
  30.  
    四个人一起到达球场,表示愿意一起打球,现在八个人开始打球

由上面实例可了解CyclicBarrier的工作原理以及复用的特性。

2.通过CountDownLatch实现CyclicBarrier

CountDownLatch通过将await()方法和countDown()方法在不同线程组分别调用,从而实现线程组间的线程等待,即一个线程组等待另一个线程组执行结束再执行。而CyclicBarrier类则是通过调用await()方法实现线程组内的线程等待,即达到需要拦截的线程数,被拦截的线程才会依次获取锁,释放锁。那么将CountDownLatch的用法进行转换,即在同一个线程组内调用await()方法和countDown()方法,则可实现CyclicBarrier类的功能。但是注意的是必须先调用countDown()方法,才能调用await()方法,因为一旦调用await()方法,该线程后面的内容便不再执行,那么count值无法改变。具体代码如下:

  1.  
    package concurrent;
  2.  
     
  3.  
    import java.util.Vector;
  4.  
    import java.util.concurrent.BrokenBarrierException;
  5.  
    import java.util.concurrent.CountDownLatch;
  6.  
    import java.util.concurrent.LinkedBlockingQueue;
  7.  
    import java.util.concurrent.ThreadPoolExecutor;
  8.  
    import java.util.concurrent.TimeUnit;
  9.  
    public class CyclicBarrierWithCount {
  10.  
    private final static CountDownLatch cdl=new CountDownLatch(3);
  11.  
    private final static ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());//使用线程池
  12.  
     
  13.  
    private static class GoThread extends Thread{
  14.  
    private final String name;
  15.  
     
  16.  
    public GoThread(String name)
  17.  
    {
  18.  
    this.name=name;
  19.  
     
  20.  
    }
  21.  
    public void run()
  22.  
    {
  23.  
    System.out.println(name+"开始从宿舍出发");
  24.  
    cdl.countDown();
  25.  
    try
  26.  
    {
  27.  
    Thread.sleep(1000);
  28.  
    cdl.await();//拦截线程
  29.  
    System.out.println(name+"从楼底下出发");
  30.  
    Thread.sleep(1000);
  31.  
    System.out.println(name+"到达操场");
  32.  
    }
  33.  
    catch(InterruptedException e)
  34.  
    {
  35.  
    e.printStackTrace();
  36.  
    }
  37.  
     
  38.  
     
  39.  
     
  40.  
    }
  41.  
    }
  42.  
     
  43.  
     
  44.  
    public static void main(String[] args) {
  45.  
    // TODO Auto-generated method stub
  46.  
     
  47.  
     
  48.  
    String[] str= {"李明","王强","刘凯","赵杰"};
  49.  
    String[] str1= {"王二","洪光","雷兵","赵三"};
  50.  
    for(int i=0;i<4;i++)
  51.  
    {
  52.  
    threadPool.execute(new GoThread(str[i]));
  53.  
    }
  54.  
    try
  55.  
    {
  56.  
    Thread.sleep(4000);
  57.  
    System.out.println("四个人一起到达球场,现在开始打球");
  58.  
    System.out.println("现在对CyclicBarrier进行复用.....");
  59.  
    System.out.println("又来了一拨人,看看愿不愿意一起打:");
  60.  
    }
  61.  
    catch(InterruptedException e)
  62.  
    {
  63.  
    e.printStackTrace();
  64.  
    }
  65.  
    for(int i=0;i<4;i++)
  66.  
    {
  67.  
    threadPool.execute(new GoThread(str1[i]));
  68.  
    }
  69.  
    try
  70.  
    {
  71.  
    Thread.sleep(4000);
  72.  
    System.out.println("四个人一起到达球场,表示愿意一起打球,现在八个人开始打球");
  73.  
    //System.out.println("现在对CyclicBarrier进行复用");
  74.  
    }
  75.  
    catch(InterruptedException e)
  76.  
    {
  77.  
    e.printStackTrace();
  78.  
    }
  79.  
    }
  80.  
     
  81.  
    }

执行上述代码,结果如下:

  1.  
    李明开始从宿舍出发
  2.  
    赵杰开始从宿舍出发
  3.  
    王强开始从宿舍出发
  4.  
    刘凯开始从宿舍出发
  5.  
    王强从楼底下出发
  6.  
    刘凯从楼底下出发
  7.  
    李明从楼底下出发
  8.  
    赵杰从楼底下出发
  9.  
    李明到达操场
  10.  
    赵杰到达操场
  11.  
    王强到达操场
  12.  
    刘凯到达操场
  13.  
    四个人一起到达球场,现在开始打球
  14.  
    现在对CyclicBarrier进行复用.....
  15.  
    又来了一拨人,看看愿不愿意一起打:
  16.  
    王二开始从宿舍出发
  17.  
    洪光开始从宿舍出发
  18.  
    雷兵开始从宿舍出发
  19.  
    赵三开始从宿舍出发
  20.  
    王二从楼底下出发
  21.  
    洪光从楼底下出发
  22.  
    雷兵从楼底下出发
  23.  
    赵三从楼底下出发
  24.  
    洪光到达操场
  25.  
    王二到达操场
  26.  
    雷兵到达操场
  27.  
    赵三到达操场
  28.  
    四个人一起到达球场,表示愿意一起打球,现在八个人开始打球

由上面可知,CountDownLatch一定情况下可以实现CyclicBarrier类的功能。

3.CountDownLatch和CyclicBarrier的比较

1.CountDownLatch是线程组之间的等待,即一个(或多个)线程等待N个线程完成某件事情之后再执行;而CyclicBarrier则是线程组内的等待,即每个线程相互等待,即N个线程都被拦截之后,然后依次执行。

2.CountDownLatch是减计数方式,而CyclicBarrier是加计数方式。

3.CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置。

4.CountDownLatch不可以复用,而CyclicBarrier可以复用。