Java基础——BlockingQueue源码分析之ArrayBlockingQueue

时间:2022-09-30 17:54:41

BlockingQueue是什么

  • BlockingQueue是一个阻塞队列的接口
  • BlockingQueue是线程安全的
  • BlockingQueue具有先进先出的特点
  • 当队列满的时候进行入队操作会阻塞,当队列空的时候进行出队操作会阻塞

BlockingQueue提供的接口

BlockingQueue提供的接口有四种不同的方法,具体如下表所示

操作 Throws Exception Special Value Blocks Times Out
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
删除 remove(o) poll() take() poll(timeout, timeunit)
查询 element() peek() - -

这四种不同的方法对应的特点分别是

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值

ArrayBlockingQueue是什么

  • ArrayBlockingQueue是一个基于数组且有界的阻塞队列,即容量固定
  • ArrayBlockingQueue是线程安全的
  • ArrayBlockingQueuee具有先进先出的特点
  • 当队列满的时候进行入队(put)操作会阻塞,当队列空的时候进行出队(take)操作会阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

ArrayBlockingQueue成员变量

final Object[] items;
int takeIndex;
int putIndex;
int count;

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

由于ArrayBlockingQueue是基于数组且右界的阻塞队列,因此需要个数组进行存储,需要两个指针索引记录队列的头地址和尾地址。其中有个锁是为了保证线程安全的,而两个Condition则是用来阻塞队列的

ArrayBlockingQueue构造方法

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

构造函数的参数

  1. capacity:指定容量的大小
  2. fair:指定是否使用公平锁,所谓的公平锁就是先等待的线程先获得锁

ArrayBlockingQueue的存储

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//获取锁
    try {
        while (count == items.length)//如果已满,等待
            notFull.await();
        enqueue(e);//如果没满,入队
    } finally {
        lock.unlock();//释放锁
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();//通知有人入队,notEmpty表示队列不会为空
}

存储的逻辑还是挺简单的

  1. 通过对put方法加锁保证线程的安全
  2. 判断队列的情况,如果满了则等待,没满则入队
  3. 入队后,通知队列的notEmpty,表示队列不会为空

ArrayBlockingQueue的获取

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//获取锁
    try {
        while (count == 0)//如果没元素,等待
            notEmpty.await();
        return dequeue();//如果有元素,出队
    } finally {
        lock.unlock();//释放锁
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();//通知有人出队,notFull表示队列不会溢满
    return x;
}

获取的逻辑基本和存储的逻辑反过来

  1. 通过对take方法加锁保证线程的安全
  2. 判断队列的情况,如果没元素则等待,如果有元素则出队
  3. 出队后,通知队列的notFull,表示队列不会溢满

这里穿插下ReentrantLock的lock和lockInterruptibly的区别

  1. 当线程有可能被其他线程中断时,lock方法会忽略中断请求,继续获取锁直到成功
  2. 当线程有可能被其他线程中断时,lockInterruptibly方法则直接抛出中断异常来立即响应中断

总结

  1. ArrayBlockingQueue队列是基于数组和Condition类来实现的
  2. ArrayBlockingQueue的存储和获取采用生产消费模式
  3. ArrayBlockingQueue的线程安全是通过ReentrantLock来保证的
  4. ArrayBlockingQueue的队列中不允许元素为null