Java并发编程札记-(五)JUC容器-05ArrayBlockingQueue与LinkedBlockingQueue

时间:2021-05-21 20:49:47

今天来学习ArrayBlockingQueue与LinkedBlockingQueue。

ArrayBlockingQueue是一个基于数组的有界阻塞队列。“有界”表示数组容量是固定的。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

属性

/** 队列中的数据 */
final Object[] items;

/** 下个要删除的项的索引(take, poll, peek ,remove方法使用) */
int takeIndex;

/** 下个插入的位置(put, offer, add方法使用) */
int putIndex;

/** 队列中元素的数量 */
int count;

以上是与队列相关的属性。下面是与并发控制相关的属性。

/** 锁 */
final ReentrantLock lock;

/** 不空的condition */
private final Condition notEmpty;

/** 不满的condition */
private final Condition notFull;

核心方法

offer(E e)

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //获取独占锁
    lock.lock();
    try {
        // 如果队列已满,则返回false。
        if (count == items.length)
            return false;
        else {
            // 如果队列未满,则插入e,并返回true。
            enqueue(e);
            return true;
        }
    } finally {
        //释放锁
        lock.unlock();
    }
}

enqueue(E x)方法源码如下:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    //将x添加到队列中
    items[putIndex] = x;
    //如果队列已满,则将[下一个被添加元素的索引]置为0
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //唤醒notEmpty上的等待线程
    notEmpty.signal();
}

将元素添加到队列之前,必须先获得独占锁。加锁后,若发现队列已满,返回false。(为什么这里没有调用notFull.await()方法?)将元素插入到队列后,调用notEmpty.signal()唤醒notEmpty上的等待线程。
take()

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //获取锁,若当前线程是中断状态,则抛出InterruptedException异常
    lock.lockInterruptibly();
    try {
        //如果队列为空,则一直等待。
        while (count == 0)
            notEmpty.await();
        //取出元素并返回
        return dequeue();
    } finally {
        //释放锁
        lock.unlock();
    }
}

dequeue()方法如下

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒在notFull上等待的线程
    notFull.signal();
    return x;
}

将元素添从队列中移除之前,必须先获得独占锁。加锁后,若发现队列为空,调用notEmpty.await(),使线程在notEmpty上等待。如果队列不为空,将元素添从队列中移除,然后调用notFull.signal()唤醒notFull的等待线程。

LinkedBlockingQueue是一个基于单向链表的、可指定大小的阻塞队列。

可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。

节点类

static class Node<E> {
    E item;

    /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */
    Node<E> next;

    Node(E x) { item = x; }
}

属性

/** 容量。初始化LinkedBlockingQueue时需要指定,如果不指定则默认为Integer.MAX_VALUE if none */
private final int capacity;

/** 链表的实际大小 */
private final AtomicInteger count = new AtomicInteger();

/** * 链表的头结点 * 以下表达式一直成立: head.item == null */
transient Node<E> head;

/** * 链表的尾节点 * 以下表达式一直成立: last.next == null */
private transient Node<E> last;

/** 获取操作的锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 获取操作的等待队列condition */
private final Condition notEmpty = takeLock.newCondition();

/** 插入操作的锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 插入操作的等待队列condition */
private final Condition notFull = putLock.newCondition();

以上属性透漏出一个重要信息:插入和获取操作使用了不同的锁。

offer(E e)

/** * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量) * 在成功时返回 true,如果此队列已满,则返回 false。 */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //如果当前没有空间可用,则返回 false。
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //加锁
    putLock.lock();
    try {
        //再次判断队列是否已满
        if (count.get() < capacity) {
            //插入元素到队列
            enqueue(node);
            c = count.getAndIncrement();
            //如果插入元素后,元素仍未满,唤醒在notFull上的等待线程
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        //释放所
        putLock.unlock();
    }
    //如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

enqueue(Node)源码如下

private void enqueue(Node<E> node) {
    last = last.next = node;
}

last = last.next = node;执行后last.next的值为null?
signalNotEmpty()源码如下

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

注意notEmpty是与takeLock相关联的,必须先获取takeLock锁,再调用notEmpty.signal()。
take()

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //获取锁,若当前线程是中断状态,则抛出InterruptedException异常
    takeLock.lockInterruptibly();
    try {
        //若队列为空,则一直等待。
        while (count.get() == 0) {
            notEmpty.await();
        }
        //取出元素
        x = dequeue();
        //取出元素之后,返回原始的节点数量,然后节点数量-1。
        c = count.getAndDecrement();
        //如果取出元素后,队列不为空,则唤醒在notEmpty上的等待线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        //释放锁
        takeLock.unlock();
    }
    //如果在获取节点之前队列为满;则取出节点后,唤醒notFull上的等待线程
    if (c == capacity)
        signalNotFull();
    return x;
}

dequeue()源码如下:

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

本文就讲到这里,想了解Java并发编程更多内容请参考: