Java高并发--AQS

时间:2021-05-25 21:05:05

Java高并发--AQS

主要是学习慕课网实战视频《Java并发编程入门与高并发面试》的笔记

AQS是AbstractQueuedSynchronizer的简称,直译过来是抽象队列同步器。AQS的底层数据结构是队列,如下所示

Java高并发--AQS

AQS使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架,利用一个int类型表示状态(state)

使用该框架的功能需要让子类继承,并重写相关方法。

  • 子类通过继承并通过重写方法管理其状态acquire和release的方法操纵状态

  • 可以同时实现排他锁和共享锁模式(独占、共享),要么使用独占锁要么使用共享锁,而不会同时使用两者

闭锁- CountdownLatch

常称为"闭锁",是一个倒计数器,如下,设定了倒计数值为3.当前线程调用await()被阻塞,其他线程每调用一次countDown()计数器减1,一直到计数值cnt等于0时,当前线程才可以继续(恢复)执行。可以理解为一个线程等待多个线程执行完毕,这样该线程可以利用其他多个线程的执行结果。

Java高并发--AQS

下面是CountdownLatch的简单使用,当计数器从60减到0时候,当前线程会打印“Go!”

由于当前线程被阻塞,需等到其余60个线程执行完毕,因此当前线程的"Go!"会最后打印。

package com.shy.concurrency.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author Haiyu
 * @date 2019/1/3 11:00
 */
@Slf4j
public class CountdownLatchExample {
    private static CountDownLatch cdl = new CountDownLatch(60);

    public static void main(String[] args) throws InterruptedException {
        final int num = 60;
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = num; i >= 0; i--) {
            final int cur = i;
            executorService.execute(() -> {
                try {
                    run(cur);
                } catch (InterruptedException e) {
                    log.error("exception", e);
                } finally {
                    cdl.countDown();
                }
            });
        }
        cdl.await();
        System.out.println("Go!");
        executorService.shutdown();
    }

    public static void run(int cur) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(cur);
    }
}

使用CountdownLatch还可以实现限时任务,在await()方法中可以传入参数,如下表示当前线程只会等待10毫秒,超过这个时间就不再等待而是恢复执行。

cdl.await(10, TimeUnit.MICROSECONDS);

将上面的程序cdl.await();修改成cdl.await(10, TimeUnit.MICROSECONDS);,将最先打印"Go!"因为它只等待10毫秒,而其他线程每次执行前都会sleep100毫秒。

信号量 - Semaphore

可以控制某个资源可以有多少个线程同时访问,即可以控制并发量。主要通过acquire和release方法,

  • acquire():获得一个许可,若无法获得会一直等待直到有线程释放了许可或者被中断
  • tryAcquire():尝试获得一个许可,获取成功返回true,失败返回false,不会等待立即返回
  • release():释放一个许可

acquire()release()还可以传入参数,指定一次获得/释放的许可数;tryAcquire()还可以指定时间,在指定时间内没有获得到许可就返回false。

循环栅栏 - CyclicBarrier

cyclicBarrier强调线程之间互相等待

Java高并发--AQS

CountdownLatch和CyclicBarrier的区别如下:

  • CountDownLatch强调一个线程等待其他所有线程,通过cdl.await()让当前线程等待在倒计数器上,每有一个线程执行完,cdl.countDown(),将计数减1,减到0时通知当前线程执行。简单的说就是一个线程等待,直到他所等待的其他线程都执行完成,当前线程才可以继续执行。
  • cyclicBarrier强调线程之间互相等待,只要有一个线程还没到来,所有线程会一起等待。可以传入一个Runnable作为计数完成要执行的任务。每有一个线程调用cyc.await()计数减1,减到0时会执行一次该Runnable。简单地说就是线程之间互相等待,等所有线程都准备好,即调用await()方法之后,执行一次Runnable,此时所有线程开始同时执行!
  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。

下面是一个例子,CyclicBarrier还可以传入一个Runnable, 每次计数到的时候都会执行一次。

package com.shy.concurrency.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * @author Haiyu
 * @date 2019/1/3 11:00
 */
@Slf4j
public class CyclicBarrierExample {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> System.out.println("集合完毕"));

    public static void main(String[] args) throws InterruptedException {
        final int num = 10;
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0;i < num; i++) {

            final int cur = i;
            executorService.execute(() -> {
                try {
                    run(cur);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executorService.shutdown();
    }

    public static void run(int cur) throws InterruptedException, BrokenBarrierException {
        log.info("{} is ready", cur);
        cyclicBarrier.await();
        log.info("{} is working", cur);
    }
}

CyclicBarrier(10, () -> System.out.println("集合完毕"));该句表示10个线程相互等待,所有10个线程都调用了await()方法后,会执行一次传入的Runnable;之后10个线程被唤醒,计数重新开始,这也是Cyclic的意义——可重复利用的计数器。

因此上面程序会先打印10个ready,然后打印一次“集合完毕”;之后会打印10个working。

重入锁 - ReentrantLock

synchronized是JVM的内置锁,而重入锁是Java代码实现的。重入锁是synchronized的扩展,可以完全代替后者。重入锁可以重入,允许同一个线程连续多次获得同一把锁。其次,重入锁独有的功能有:

  • 可以相应中断,synchronized要么获得锁执行,要么保持等待。而重入锁可以响应中断,使得线程在迟迟得不到锁的情况下,可以不再等待。主要由lockInterruptibly()实现,这是一个可以对中断进行响应的锁申请动作,锁中断可以避免死锁。
  • 锁的申请可以有等待时限,用tryLock()可以实现限时等待,如果超时还未获得锁会返回false,也防止了线程迟迟得不到锁时一直等待,可避免死锁。
  • 公平锁,即锁的获得按照线程先来后到的顺序依次获得,不会产生饥饿现象。synchronized的锁默认是不公平的,重入锁可通过传入构造方法的参数实现公平锁。
  • 重入锁可以绑定多个Condition条件,这些condition通过调用await/singal实现线程间通信。可以实现分组唤醒需要唤醒的线程,而不是像synchronized的wait/notify一样要么随机唤醒要给要么唤醒全部。

读写锁 - ReentrantReadWriteLock

ReadWriteLock即读写锁,它有两个方法如下,分别返回一个读锁和写锁,即读写锁分离。

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();

在读时使用readLock进行加锁,在写时使用writeLock进行加锁。使得读-读不阻塞,读线程完全并行,适合读多写少的场合。读锁会完全阻塞写锁,它使用的依然是悲观的锁策略。如果有大量的读线程,也有可能引起写线程的饥饿。(如果想获得写锁,不允许还有读锁、写锁还保持着,即 在没有任何读锁、写锁的情况下才能获得写锁)