Java并发(三)——同步计数器

时间:2022-11-30 16:18:43

1 CountDownLatch同步计数器

1.1 类的概述

  倒计数门闩。在完成一组正在其它线程中执行的操作之前,它允许一个或多个线程一直等待。在计数器到达0之前,await方法会一直阻塞;之后,会释放所有等待线程,await的所有后续调用都将返回。

1.2 主要方法

  1. CountDownLatch(int count)
    构造一个用给定计数初始化的同步计数器

  2. void await()
    使当前线程在计数器倒计数至0前一直等待,除非被中断。

  3. boolean await(long timeout, TimeUnit unit)
    使当前线程在计数器倒计时至0前一直等待,除非被中断或超时。

  4. void countDown()
    计数器值减一;如果到达0,则释放所有等待的线程

  5. long getCount()
    返回当前计数

1.3 使用场景

(1)开5个线程去下载,当5个线程都执行完才算下载成功
(2)多个线程上传文件,只有当每个文件都上传成功才算上传成功。

1.4 应用示例

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        Worker worker1 = new Worker("worker1", latch);
        Worker worker2 = new Worker("worker2", latch);
        Worker worker3 = new Worker("worker3", latch);

        worker1.start();
        worker2.start();
        worker3.start();

        latch.await();  // 等待计数器到0

        System.out.println("Main thread end");

    }

    // 内部类,线程类的实现
    static class Worker extends Thread {
        private String workName;
        private CountDownLatch latch;

        public Worker(String name, CountDownLatch latch) {
            this.workName = name;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                System.out.println("worker:"+this.workName+" is begin");
                Thread.sleep(1000L);
                System.out.println("worker:"+this.workName+" is end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown();
            }
        }
    }
}

Java并发(三)——同步计数器

2 CyclicBarrier同步计数器

2.1 类的概述

  循环屏障允许一组线程互相等待,直到到达某个公共屏障点,然后所有的这组线程再同步往后执行。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的barrier。

2.2 主要方法

  1. CyclicBarrier(int parties)
    创建一个新的循环屏障,它将在给定数量的线程处于等待状态时启动,但不会在启动barrier时执行预定义的操作。

  2. CyClicBarrier(int parties, Runnable barrierAction)
    创建一个新的循环屏障,它将在给定数量的线程处于等待状态时启动,并在启动barrier时执行给定的屏障操作barrierAction,该操作由最后一个进入barrier的线程执行。

  3. int await()
    在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

  4. int await(long timeout, TimeUnit unit)
    在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。

  5. int getNumberWaiting()
    返回当前在屏障处等待的线程数目

  6. int getParties()
    返回要求启动此barrier的线程数目

  7. void reset()
    将循环屏障重置为初始状态。

2.3 使用示例

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, new TotalTask());

        Worker worker1 = new Worker("worker1", barrier);
        Worker worker2 = new Worker("worker2", barrier);
        Worker worker3 = new Worker("worker3", barrier);

        worker1.start();
        worker2.start();
        worker3.start();

        System.out.println("main thread end");
    }

    // 启动barrier时执行该任务,即当最后一个线程进入barrier时执行该任务
    static class TotalTask extends Thread {
        @Override
        public void run() {
            System.out.println("所有线程到达barrier");
        }
    }

    // 任务线程
    static class Worker extends Thread {
        private String name;
        private CyclicBarrier barrier;

        public Worker(String name, CyclicBarrier barrier) {
            this.name = name;
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(name+"任务开始");
                Thread.sleep(1000L);
                System.out.println(name+"任务完成");

                barrier.await();    // 线程到达屏障
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

Java并发(三)——同步计数器

2.4 CountDownLatch与CyclicBarrier的区别

CountDownLatch:一个线程等待另外N个线程完成某个事情之后才能执行,重点是一个线程在等待
CyclicBarrier:N个线程互相等待,任何一个线程完成之前,所有线程都必须等待。

3 Semaphore同步计数器

3.1 类的概述

  是一个计数信号量,维护一个许可集合。在许可可用前会阻塞每一个acquire(),等待获取许可;release()释放当前占用的许可,允许其它阻塞的线程获得。

3.2 方法概述

  1. Semaphore(int permits)
    创建具有给定许可数目、非公平的Semphore对象

  2. Semaphore(int permits, boolean fair)
    创建具有给定许可数目、公平的Semaphore对象。所谓公平性就是先来先服务FIFO

  3. void acquire()
    从此信号量获取一个许可,在获取到许可之前线程将被阻塞

  4. int availablePermits()
    返回此信号量中的可用许可数目

  5. void release()
    释放当前许可

3.3 使用示例

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);// 许可数目为3

        // 创建并启动12个线程
        for (int i = 0; i < 12; i++) {
            Worker worker = new Worker("worker"+i, semaphore);
            worker.start();
        }

        System.out.println("main thread end");
    }

    // 任务线程,共用同一个信号量
    static class Worker extends Thread {
        private String name;
        private Semaphore semaphore;

        public Worker(String name, Semaphore semaphore) {
            this.name = name;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000L);

                semaphore.acquire();// 等待获取一个许可
                System.out.println(name+"获取到一个许可,开始处理任务");
                Thread.sleep(1000L);

                semaphore.release();// 释放一个许可
                System.out.println(name+"释放许可,任务结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Java并发(三)——同步计数器