多线程等待所有子线程执行完使用总结(3)——CyclicBarrier使用和源码初步分析

时间:2023-02-16 19:06:34

问题背景

我们在日常开发和学习过程中,经常会使用到多线程的场景,其中我们经常会碰到,我们代码需要等待某个或者多个线程执行完再开始执行,上一篇文章中(参考 https://blog.51cto.com/baorant24/6060871 ),我们介绍了CountDownLatch使用和源码初步分析,本文将介绍一种新的方案,CyclicBarrier类的使用。

问题分析

话不多说,直接上个demo,先看下CyclicBarrier的一般使用方法,代码如下:

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import java.util.concurrent.BrokenBarrierException
import java.util.concurrent.CyclicBarrier

class TestCyclicBarrierActivity : AppCompatActivity() {
    // 线程数
    private val threadSize = 5
    private var cb: CyclicBarrier? = null

    companion object {
        const val TAG = "TestCyclicBarrier"
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_test_cyclic_barrier)

        cb = CyclicBarrier(threadSize) {
            // 裁判
            Log.d(TAG, "参赛者" + cb!!.parties + "个全部准备完毕 --> 各就各位,预备跑");
        }

        for (i in 0 until threadSize) {
            AthleteThread().start()
        }

        Log.d(TAG, "主线程不用等待,继续执行");
    }

    inner class AthleteThread : Thread() {
        override fun run() {
            try {
                // 运动员
                Log.d(TAG, currentThread().name + "号选手准备好了")
                cb?.await()
                println(currentThread().name + "跑,跑,跑")
            } catch (e: InterruptedException) {
                e.printStackTrace()
            } catch (e: BrokenBarrierException) {
                e.printStackTrace()
            }
        }
    }
}

运行结果如下: 多线程等待所有子线程执行完使用总结(3)——CyclicBarrier使用和源码初步分析 运行结果分析: 在所有选手准备好之后开始发令起跑,这也就是我们所说的线程同步。它的特点是主线程不用等待,继续执行。

问题解决

上面通过demo了解了CyclicBarrier的一般使用方法,下面我们对CyclicBarrier的源码进行一个初步分析和了解。 (1)核心成员变量

// 使用ReentrantLock锁初始化锁,便于condition产生每组条件
private final ReentrantLock lock = new ReentrantLock();
// 循环锁的核心条件,依赖于ReentrantLock锁
private final Condition trip = lock.newCondition();
// 初始屏障数
private final int parties;
/ /额外的线程任务如每个线程都要执行共同的任务时使用
private final Runnable barrierCommand;
/**
* 主要是作为辅助标志
* 是否异常中断该组阻塞(BrokenBarrierException)和重置下组条件(调用breakBarrier方法)
*/
private Generation generation = new Generation();
// 该类为辅助内部类
private static class Generation {
  boolean broken = false;
 }

(2)构造函数

/**
* CyclicBarrier设置屏障数,采用默认的barrierAction为null
*/
public CyclicBarrier(int parties) {
    this(parties, null);
}
/**
* CyclicBarrier屏障数、额外步骤内容参数去初始化屏障
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 设置初始化屏障数
    this.parties = parties;
    // 剩余屏障数
    this.count = parties;
    // 额外步骤内容
    this.barrierCommand = barrierAction;
}

(3)await方法(关键代码) java.util.concurrent.CyclicBarrier#await()

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

java.util.concurrent.CyclicBarrier#dowait

/**
 * dowait主要是为CyclicBarrier普通等待和超时等待await服务
 * timed代表区分普通等待(false)和超时等待(true)、nanos代表超时时间
 */
  private int dowait(boolean timed, long nanos)
      throws InterruptedException,BrokenBarrierException,TimeoutException {
      // 内部锁,利用ReentrantLock进行初始化
      final ReentrantLock lock = this.lock;
      // 获取锁
      lock.lock();
      try {
          final Generation g = generation;
          if (g.broken)
              throw new BrokenBarrierException();
          if (Thread.interrupted()) {
              breakBarrier();
              throw new InterruptedException();
          }
          int index = --count;// 当前剩余达到屏障值
          if (index == 0) {  // 是否到达改组的唤醒值
              boolean ranAction = false;
              try {
                  final Runnable command = barrierCommand;
                  if (command != null)
                      command.run();// 执行补充任务
                  ranAction = true;// 防止程序出现异常,导致执行finally中逻辑
                  nextGeneration();// 正常唤醒当前组阻塞线程、重置下组初始值count
                  return 0;
              } finally {
                  if (!ranAction)
                      breakBarrier();// 程序异常,需要唤醒当前组被阻塞线程和初始下组条件
              }
          }
          // for循环自旋,主要是未达到该组的屏障值进行阻塞
          for (;;) {
              try {    
                  if (!timed) //普通阻塞
                      trip.await(); //调用基于ReentrantLock中产生的Condition条件中的await方法进行阻塞
                  else if (nanos > 0L) //超时阻塞
                      nanos = trip.awaitNanos(nanos);
              } catch (InterruptedException ie) {// try逻辑出现中断异常
                  if (g == generation && ! g.broken) {// 异常辅助是否被改变过
                      breakBarrier();
                      throw ie;
                  } else {
                      Thread.currentThread().interrupt();
                  }
              }
              if (g.broken)
                  throw new BrokenBarrierException();
              if (g != generation)
                  return index;//返回上组的剩余屏障数
              if (timed && nanos <= 0L) {//超时阻塞await时且已超时执行
                  breakBarrier();//唤醒当前组阻塞线程和重置下组条件
                  throw new TimeoutException();
              }
          }
      } finally {
          lock.unlock();//释放当前线程锁对象执行权
      }
  }

问题总结

我们在日常开发和学习过程中,经常会使用到多线程的场景,其中经常会碰到,我们代码需要等待某个或者多个线程执行完再开始执行,上一篇文章中,我们介绍了通过CountDownLatch类来实现,本文介绍了一种新的方案,CyclicBarrier类的使用,有兴趣的同学可以进一步深入研究。