Java 并发编程 --- LinkedBlockingQueue与ArrayBlockingQueue (七)

时间:2021-09-14 20:50:12

阻塞队列与普通的队列(LinkedList/ArrayList)相比,支持在向队列中添加元素时,队列的长度已满阻塞当前添加线程,直到队列未满或者等待超时;从队列中获取元素时,队列中元素为空 ,会将获取元素的线程阻塞,直到队列中存在元素 或者等待超时。

在JUC包中常用的阻塞队列包含ArrayBlockingQueue/LinkedBlockingQueue/LinkedBlockingDeque等,从结构来看都继承了AbstractQueue实现了BlockingQueue接口(LinkedBlockingDeque是双向阻塞队列,实现的是BlockingDeque接口),在BlockingQueue接口中定义了几个供子类实现的接口,可以分为3部分,puts操作、takes操作、其他操作。

puts操作
add(E e) : 添加成功返回true,失败抛IllegalStateException异常
offer(E e) : 成功返回 true,如果此队列已满,则返回 false(如果添加了时间参数,且队列已满也会阻塞)
put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞

takes操作
remove(Object o) :移除指定元素,成功返回true,失败返回false
poll() : 获取并移除此队列的头元素,若队列为空,则返回 null(如果添加了时间参数,且队列中没有数据也会阻塞)
take():获取并移除此队列头元素,若没有元素则一直阻塞。
peek() :获取但不移除此队列的头;若队列为空,则返回 null。

other操作
contains(Object o):队列中是否包含指定元素
drainTo(Collection<? super E> c):队列转化为集合

关于阻塞队列,我们主要看LinkedBlockingQueue与ArrayBlockingQueue.

ArrayBlockingQueue

ArrayBlockingQueue是基于数组的、有界的、遵循FIFO原则的阻塞队列,队列初始化时必须指定队列的长度。
这是一个经典的“有界缓冲区”,其中固定大小的数组包含由生产者插入并由消费者提取的元素。创建后,无法更改容量。此类支持用于排序等待生产者和消费者线程的可选公平策略。默认情况下,不保证此顺序。但是,将fairness设置为true构造的队列以FIFO顺序授予线程访问权限。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。

结构

Java 并发编程 --- LinkedBlockingQueue与ArrayBlockingQueue (七)

相关变量

final Object[] items; //一个数组,用来存放队列中的变量(队列的基础)
int count; //队列中元素的数量
int takeIndex; //下一次take、poll、remove、peek操作的下标值
int putIndex; //下次add、offer、put操作的下标值

构造函数

ArrayBlockingQueue提供了三个构造函数,在只传递初始化大小值时,默认使用的锁是非公平锁,对比三个不同的构造函数而言,真正初始化队列的构造方法是ArrayBlockingQueue(int capacity, boolean fair)方法,传入集合的构造方法会在调用该方法后将集合遍历存入队列中

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    //使用同一个锁对象,此处是与LinkedBlockingQueue(使用两个不同的锁来控制添加,取出操作)不同的地方
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                //向数组中添加数据
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        //设置下次添加操作对应的数组下标值
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

offer/add操作

add本质上调用的是offer操作,通过返回值true/false可以判断队列中添加元素是否成功,队列已满返回false

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
     //队列已满,直接返回false
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

//入队列操作
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //唤醒take/poll(有时间参数)方法获取数据的线程
    notEmpty.signal();
}

put操作

没有返回值,队列已满则等待,知道被唤醒

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            //队列已满线程挂起等待
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

poll操作

队列为空返回null,对于有时间参数的poll操作,在队列为空时,会被挂起等待

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    //归零操作
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒puts线程
    notFull.signal();
    return x;
}

take操作

队列为空等待,直到队列中存在元素被唤醒

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

peek操作

获取队列中第一个不为空的元素(每次takes操作,或者puts操作都会设置下次takes/puts操作的下标)

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //通过下标值获取元素
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

remove操作

删除内部元素操作是一种本质上缓慢且具有破坏性的操作,需要将删除元素后的元素统一迁移一个单位,并且在操作过程中会获得锁,对性能有影响,因此不应轻易执行remove操作

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            //遍历队列
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    //如果需要移除的元素下标值为下一次取数的下标值,执行类似取数的操作
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
     //如果takes下标到达队列最大长度,归零
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        //如果不相等,将需删除元素的后续元素统一迁移一位
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
          //移动完成,设置puts操作下标
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    //唤醒put操作等待的线程
    notFull.signal();
}

综上,ArrayBlockingQueue队列的逻辑并不复杂,但需要注意一下几点

1.ArrayBlockingQueue是以数组来实现队列功能的,在执行puts或者takes操作时一旦下一个操作的下标值大于队列的长度,类中用来记录存取下标值会归零,已达到循环使用队列的目的

2.ArrayBlockingQueue是通过一个锁控制takes以及puts操作,说明在同一时间内只能执行takes操作或者puts操作中的一种,对于阻塞队列来说,保证了线程安全,
但是会影响队列的消费和生产效率,并发性会下降
3.ArrayBlockingQueue在执行remove操作时,会将整个数组进行移动(最坏情况下),同时还会获得锁,对性能的影响比较大

LinkedBlockingQueue

LinkedBlockingQueue是基于链表的、有界的、遵循FIFO原则的阻塞队列,队列默认的最大长度为Integer.MAX_VALUE

结构

Java 并发编程 --- LinkedBlockingQueue与ArrayBlockingQueue (七)

重要属性

private final int capacity;//队列的大小,可以自定义,默认为Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger(); //当前队列中元素的数量
//take、poll操作需要持有的锁,LinkedBlockingQueue支持并发操作,对于从队列中获取数据需要加锁(会阻塞,ConcurrentLinkedQueue/ConcurrentLinkedDeque
是使用CAS操作来控制的,不会出现阻塞的问题)
private final ReentrantLock takeLock = new ReentrantLock();
//put、offer操作需要持有的锁,同上 private final ReentrantLock putLock = new ReentrantLock();
//notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 private final Condition notEmpty = takeLock.newCondition();
//notFull条件对象,当队列数据已满时用于挂起执行添加的线程 private final Condition notFull = putLock.newCondition();

Node类

相对于其他(ConcurrentLinkedQueue/ConcurrentLinkedDeque)类来说,LinkedBlockingQueue的Node类要简单好多,由于是基于单链表实现的,只有一个next属性(保存后继节点),一个item属性(存放值),一个构造函数

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

构造函数

LinkedBlockingQueue提供了三个构造函数,在不传参数的情况下,默认队列的大小为Integer.MAX_VALUE,对比三个不同的构造函数而言,真正初始化 队列的构造方法是LinkedBlockingQueue(int capacity)方法,传入集合的构造方法会在调用该方法后将集合遍历存入队列中

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    //设置队列大小,new一个null节点,head、tail节点指向改节点
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //获取put、offer操作需要的锁,可重入
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //将队列的last节点指向该节点
            enqueue(new Node<E>(e));
            ++n;
        }
        //队列元素计数
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

offer操作

通过返回值true/false判断是否成功,offer操作当队列满后并不会阻塞(有时间参数的offer操作也会阻塞),而是直接返回false,put操作是没有返回值的,并且会一直阻塞,等待被唤醒(或者超过时间抛出异常)

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    //是否超过最大值
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //获取锁
    putLock.lock();
    try {
        //再次判断队列是否存放满(可能存在多线程的情况)
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            //如果队列没有放满,唤醒下一个添加线程
            if (c + 1 < capacity)
                //其实这个地方,唤醒的添加线程是执行put方法(或者offer有时间参数的操作)时被阻塞的线程,如果仅仅只是执行offer操作应该不会执行任何操作,
                  没有对应的添加线程添加到条件队列中(个人理解,也是不太理解的地方)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //当c=0时,表明当前队列中存在一个元素,通知消费线程去消费
        signalNotEmpty();
    return c >= 0;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //唤醒条件等待队列中的消费者去消费数据
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

put操作

 put方法不会像offer方法那样去检查队列大小是否超过设定值,put操作一个元素入队列时,如果队列已满,当前线程会进入nofull的条件等待队列中等待,直到队列中元素个数小于队列大小时被唤醒,才继续put操作

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        //队列已满存放线程阻塞,等待被唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        //入队列
        enqueue(node);
        c = count.getAndIncrement();
        //队列没满,唤醒notfull等待队列中的添加线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //当c=0时,表明当前队列中存在一个元素,通知消费线程去消费
        signalNotEmpty();
}

poll操作

必定会有返回值(异常除外),但包含null(队列中没有数据),与take操作比较可以发现,take操作在队列中没有数据时执行take操作的线程会被挂起,直到队列中有数据(有时间参数的poll操作也会被挂起,等待唤醒或者超时)

public E poll() {
    final AtomicInteger count = this.count;
    //如果队列没有数据,返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                //唤醒其他消费线程
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

//出队列操作,因为队列的head节点为null节点,在出队列时,会始终保持head节点为空,next节点为真正意义上的首节点
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    //自指向,该节点已经无用,便于GC
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

take操作

take操作不会向poll操作去检查队列中有没有数据,队列中没有数据时会被挂起,等待被唤醒

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //队列中是否有数据,没有等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            //如果队列中有数据存在,唤醒notempty等待队列中的消费线程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //唤醒添加线程
        signalNotFull();
    return x;
}

peek操作

获取队列中头部元素,可能存在其他线程执行的删除、take(poll)操作,所以要加锁,获取数据不一定准确

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //head节点不存放数据,所以取的是next节点
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

remove操作

在执行删除操作时,会将puts以及takes操作都上锁,保证线程安全,然后执行遍历删除操作,在删除后,会去唤醒等待中的添加线程执行 添加操作

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        //遍历单项链表
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                //移除数据
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
    p.item = null;
    trail.next = p.next;
    if (last == p)
        last = trail;
    //唤醒添加线程
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

综上,LinkedBlockingQueue的api与ArrayBlockingQueue并无太大差别,在实现思想上,LinkedBlockingQueue使用了锁分离以及链表,其他与ArraBlockingQueue(一个锁统一管理、数组)没太大区别

LinkedBlockingQueue与ArrayBlockingQueue异同

1.LinkedBlockingQueue是基于链表实现的初始化是可以不用指定队列大小(默认是Integer.MAX_VALUE);ArrayBlockingQueue是基于数组实现的初始化时必须指定队列大小

2.LinkedBlockingQueue在puts操作都会生成新的Node对象,takes操作Node对象在某一时间会被GC,可能会影响GC性能;ArrayBlockingQueue是固定的数组长度循环使用,
不会出现对象的产生与回收
3.LinkedBlockingQueue是基于链表的形式,在执行remove操作时,不用移动其他数据;ArrayBlockingQueue是基于链表,在remove时需要移动数据,影响性能 4.LinkedBlockingQueue使用两个锁将puts操作与takes操作分开;ArrayBlockingQueue使用一个锁来控制,在高并发高吞吐的情况下,LinkedBlockingQueue的性能较好