ArrayBlockingQueue简介
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable
- 一个由数组支持的有界 阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的 头部 是在队列中存在时间最长的元素。队列的 尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
ArrayBlockingQueue是Java并发框架中阻塞队列的最基本的实现,分析这个类就可以知道并发框架中是如何实现阻塞的。
ArrayBlockingQueue实现阻塞队列的关键在与对锁(Lock)和等待条件(Condition)的使用,这两个实现的基本功能类似于wait()和notify(),是wait()和notify()的高级用法。
ArrayBlockingQueue构造函数
有如下3个构造函数,没有默认构造函数。
1 public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
传入的参数即为创建的对象数组的大小,第二个参数是否为对称锁,false为非对称锁。同时初始化锁ReentrantLock。两个锁ReentrantLock上的Conditon,一个为notEmpty,一个为notFull.
2 public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}
主要分析ArrayBlockingQueue的4个核心方法,offer()、 poll() 和 put() 、take()。
1 offer()插入元素有两个重载的方法,有参数和无参数的
,先说无参数的
1.1 public boolean offer(E e):
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
分析:传入参数e如果为null,则抛出空指针异常,否则拿到ReentrantLock锁,然后判断此队列已满,满则不等待直接返回 false。否则将指元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回true。
1.2 public boolean offer(E e, long timeout, TimeUnit unit) 分析:
该方法用于插入元素到数组的尾部,首先判断传入的参数e是否为null.如果为null则抛出空指针异常。然后判断数组是否已满,满则进入等待,直到出现以下三种情况时才继续:被唤醒,到达指定的时间或者当前线程被中断(interrupt).
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
分析:先判断传入的参数e是否为null.如果为null则抛出空指针异常。然后将传入的指定的时间转换为纳秒,再然后ReentrantLock加锁,进入循环中,再循环中判断如下:
(1) 如果数组未满,则将对象插入数组,并返回true,
(2) 如果数组已满,且已超过指定的时间,则返回false,
(3) 如未超过指定的时间,则调用notFull condition的awaitNanos()方法进行等待。
(4)在调用了awaitNanos()方法后,如果被唤醒或到达指定的时间,则继续判断数组是否已满,如果当前线程被中断(interrupt),则直接抛出InterruptedException.
2 poll(),poll获取元素 有两个重载的方法,有参数和无参数
2.1先讲无参数的。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
E x = extract();
return x;
} finally {
lock.unlock();
}
}
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
分析:这个方法是不阻塞的,当队列未空的时候,直接返回null值,所以实现中只是一个锁的简单使用,防止并发问题。
2.2 有参数的 public E poll(long timeout, TimeUnit unit) throws InterruptedException
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
获取队列的头部元素,在指定时间之内阻塞等待,如果超出阻塞时间队列仍然空,则返回null值
这个方法的逻辑和其他方法的不同之处就在Condition的一个时间计数器方法awaitNanos(…),这里先将时间大小根据时间单位换算成纳秒的数值,当队列容量为0是,使用Condition.awaitNanos(…),进行技术,超时后返回空。
3 put()插入元素
12345678910111213141516 | /** * 从队列的尾部插入元素,如果队列已满,将阻塞等待到队列有空间的时候进行插入操作。 * */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); // 获得当前线程的锁 try { while (count == items.length) // 循环等待 notFull.await(); insert(e); } finally { lock.unlock(); //释放锁 } } |
这个方法在当前线程没有中断的情况下,获取锁,接着对数组容量进行判断,如果容量已满,则循环等待带容量腾出来为止,最后释放当前线程锁。这样的业务逻辑就产生了这样的场景,线程一进入到该方法,成功插入队列,释放锁,假设刚好容量满;线程二进入该方法,循环等待;线程三从容器中获取元素;线程二判断容量未满,插入,释放锁。如果有多个线程在等待的时候,会出现什么情况呢,从代码的逻辑来看,当多个线程都在阻塞等待的时候,要看谁首先抢到锁,也就是消费方法是抢占式的。
4 take()获取元素
1234567891011121314 | /** * 返回队列头部的元素,如果队列为空,阻塞等待其他线程往当前容器放入元素为止。 */ public E take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); // 获取当前线程的锁 try { while (count == 0 ) // 阻塞等待 notEmpty.await(); return extract(); } finally { lock.unlock(); // 释放锁 } } |
获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
总之,ArrayBlokingQueue使用的Java的现实锁(Lock)配合Condition进行阻塞,使用Condition进行时间技术。而在并发框架中其他的阻塞和时间技术,也同样是用这两个对象API来实现。
LinkedBlockingQueue简介
除了ArrrayBlockingQueue之外,BlockingQueue的实现上常用的还有LinkedBlockingQueue, LinkedBlockingQueue实现的不同为采用对象的next构成链表的方式来存储对象。
由于读只操作队头,而写只操作队尾,这里巧妙地采用了两把锁,对于put和offer采用一把锁,对于take和poll则采用另外一把锁,避免了读写时互相竞争锁的现象,因此LinkedBlockingQueue在高并发读写操作都多的情况下性能会较ArrayBlockingQueue好很多,在遍历以及删除元素则要两把锁都锁住。
BlockingQueue实现中JDK5 Update12以及JDK6 Update2之前的版本有内存泄露,因此使用BlockingQueue,升级到修复该bug后的版本。
参考
JDK1.6 ArrayBlokingQueue源码
Java ArrayBlockingQueue源码解析: http://www.xiaoyaochong.net/wordpress/index.php/2013/03/02/java-arrayblockingqueue%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
BlockingQueue及其各个实现的分析整理:http://www.molotang.com/articles/563.html