Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

时间:2024-06-13 15:42:19

Java等待异步线程池跑完再执行指定方法的三种方式(condition、CountDownLatch、CyclicBarrier)

@Async如何使用

使用@Async标注在方法上,可以使该方法异步的调用执行。而所有异步方法的实际执行是交给TaskExecutor的。

1.启动类添加@EnableAsync注解
2. 方法上添加@Async,类上添加@Component

三个异步方法

    @Async
    public void doTaskTwo( CyclicBarrier barry) throws Exception {
        System.out.println("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(3000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
        barry.await();
    }

    @Async
    public void doTaskThree( CyclicBarrier barry) throws Exception {
        System.out.println("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(5000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
        barry.await();
    }

    @Override
    @Async
    public void doTask2One(CountDownLatch count) throws Exception {
        System.out.println("开始做任务1");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(3000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务1,耗时:" + (end - start) + "毫秒");
        count.countDown();
    }

CyclicBarrier

@GetMapping("/doTask")
    public void doLogin() throws Exception {

        // 通过它可以实现让一组线程等待至某个状态之后再全部同时执行
        //第一个参数,表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
        //第二个参数,表示用于在线程到达屏障时,优先执行barrierAction这个Runnable对象,方便处理更复杂的业务场景。
        CyclicBarrier barry = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("1111111111111");
            }
        });
        kafkaTopicService.doTaskOne(barry);
        kafkaTopicService.doTaskTwo(barry);
        kafkaTopicService.doTaskThree(barry);


    }

执行结果

开始做任务一
开始做任务二
开始做任务三
完成任务一,耗时:1263毫秒
完成任务二,耗时:2508毫秒
完成任务三,耗时:3753毫秒
1111111111111

注意: 接口响应成功时候 后台逻辑还在走
CyclicBarrier 的使用场景也很丰富。

比如,司令下达命令,要求 10 个士兵一起去完成项任务。

这时就会要求 10 个士兵先集合报到,接着,一起雄赳赳,气昂昂地去执行任务当 10 个士兵把自己手上的任务都执行完了,那么司令才能对外宣布,任务完成

CyclicBarrier 比 CountDownLatch 略微强大一些,它可以接收一个参数作为 barrierAction。

所谓 barrierAction 就是当计数器一次计数完成后,系统会执行的动作。

如下构造函数,其中, parties 表示计数总数,也就是参与的线程总数。

public CyclicBarrier(int parties, Runnable barrierAction) 

package com.shockang.study.java.concurrent.aqs;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclic;

        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldier = soldierName;
        }

        public void run() {
            try {
                //等待所有士兵到齐 第一次等待
                cyclic.await();
                doWork();
                //等待所有士兵完成工作 第二次等待
                cyclic.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + ":任务完成");
        }
    }

    public static class BarrierRun implements Runnable {
        boolean flag;
        int N;

        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }

        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "个,任务完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "个,集合完毕!]");
                flag = true;
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        //设置屏障点,主要是为了执行这个方法
        System.out.println("集合队伍!");
        for (int i = 0; i < N; ++i) {
            System.out.println("士兵 " + i + " 报道!");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }
}

控制台输出

集合队伍!
士兵 0 报道!
士兵 1 报道!
士兵 2 报道!
士兵 3 报道!
士兵 4 报道!
士兵 5 报道!
士兵 6 报道!
士兵 7 报道!
士兵 8 报道!
士兵 9 报道!
司令:[士兵10个,集合完毕!]
士兵 0:任务完成
士兵 3:任务完成
士兵 6:任务完成
士兵 4:任务完成
士兵 9:任务完成
士兵 8:任务完成
士兵 2:任务完成
士兵 5:任务完成
士兵 7:任务完成
士兵 1:任务完成
司令:[士兵10个,任务完成!]

说明
上述代码第 65 行创建了 CyclicBarrier 实例,并将计数器设置为 10 ,要求在计数器达到指标时,执行第 51 行的 run() 方法。

每一个士兵线程都会执行第 18 行定义的 run() 方法。

在第 24 行,每一个士兵线程都会等待,直到所有的士兵都集合完毕。

集合完毕意味着 CyclicBarrier 的一次计数完成,当再一次调用 CyclicBarrier.await() 方法时,会进行下一次计数。

第 22 行模拟了士兵的任务。

当一个士兵任务执行完,他就会要求 CyclicBarrier 开始下次计数,这次计数主要目的是监控是否所有的士兵都己经完成了任务。

一旦任务全部完成,第 42 行定义的 BarrierRun 就会被调用,打印相关信息。

2、CountDownLatch

 @GetMapping("/doTask2")
    public void doTask2() throws Exception {

        CountDownLatch count = new CountDownLatch(3);

        kafkaTopicService.doTask2One(count);
        kafkaTopicService.doTask2Two(count);
        kafkaTopicService.doTask2Three(count);
        count.await();
        System.out.println("11111111111111111");

    }

执行结果:

开始做任务1
开始做任务二
开始做任务3
完成任务1,耗时:179毫秒
完成任务3,耗时:1829毫秒
完成任务二,耗时:2376毫秒
11111111111111111

注意: 接口响应成功时候 后台逻辑已经走完

1、juc中condition接口提供的await、signal、signalAll方法,需配合lock

List<Integer> villageList = new ArrayList<>();
    List<Integer> villageList2 = new ArrayList<>();
    villageList.add(1);
    villageList.add(2);
    villageList.add(3);
    ExecutorService threadPool = Executors.newFixedThreadPool(2);

    Lock lock = new ReentrantLock();
    Condition cond = lock.newCondition();
    for(int flag = 0;flag<villageList.size();flag++){
        Integer i = villageList.get(flag);
        threadPool.execute(() -> {
            try {
                villageList2.add(i);
            } catch (Exception e) {
                e.printStackTrace();
            }
            if(villageList2.size() == villageList.size()){
                lock.lock();
                cond.signal();
                lock.unlock();
            }
        });
    }
    lock.lock();
    try {
        cond.await(5, TimeUnit.SECONDS);
        lock.unlock();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(villageList2.size());
    threadPool.shutdown();
}

CountDownLatch和CyclicBarrier的比较

1.CountDownLatch是减计数方式,countDown()方法只参与计数不会阻塞线程,而CyclicBarrier是加计数方式,await()方法参与计数,会阻塞线程。
2.CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置,因此CyclicBarrier可以复用。

附:CountDownLatch有发令枪的效果,可以用来并发测试

public class test2 {

    private final static CountDownLatch CountDownLatch=new CountDownLatch(100);

    public static void main(String[] args) throws Exception{

        for (int i=0;i<100;i++){
            new Thread(() -> {
                try {
                    CountDownLatch.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                getUUID();
            }).start();
            CountDownLatch.countDown(); //如果减到0 统一出发,发枪开炮

        }
    }

    public static void getUUID(){
        UUID uuid= UUID.randomUUID();
        System.out.println(uuid);
    }
}

注意事项

  1. CountDownLatch 对象的计数器只能减不能增,即一旦计数器为 0,就无法再重新设置为其他值,因此在使用时需要根据实际需要设置初始值。

  2. CountDownLatch 的计数器是线程安全的,多个线程可以同时调用 countDown() 方法,而不会产生冲突。

  3. 如果 CountDownLatch 的计数器已经为 0,再次调用 countDown() 方法也不会产生任何效果。

  4. 如果在等待过程中,有线程发生异常或被中断,计数器的值可能不会减少到 0,因此在使用时需要根据实际情况进行异常处理。

当worker1线程由于异常没有执行countDown()方法,最后state结果不为0,导致所有线程停在AQS中自旋(死循环)。所以程序无法结束。(如何解决这个问题呢?请看案例二)

// 等待 3 个线程完成任务
if (!latch.await(5, TimeUnit.SECONDS)) {
    LOGGER.warn("{} time out", worker1.name);
}
 
// 所有线程完成任务后,执行下面的代码
LOGGER.info("all workers have finished their jobs!");
  1. CountDownLatch 可以与其他同步工具(如 Semaphore、CyclicBarrier)结合使用,实现更复杂的多线程同步。