这是一个典型的「有界缓冲区」,一个固定大小的数组持有从生产者(producers)产生的和被消费者(consumers)消费的数据。此队列一旦创建,队列大小就固定了,不能再改变。尝试将数据加入到满队列中,将会被阻塞;尝试从空队列中取数据也同样会被阻塞。
本类提供生产者和消费者线程访问顺序的一个可选的公平策略。默认情况下,为了性能,这个访问顺序不被保证。不过,在初始化构造本类的时候,将 fairness 设置为 true,则可以保证线程访问的公平性,这样设置的坏处是降低吞吐量,好处是减少了可变性和线程的饥饿现象。
一. 代码
(1)核心变量
/** Main lock guarding all access */阻塞队列实现的关键参数,一个可重入锁,和两个条件,使用经典的双状态算法(two-condition algorithm)
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** The queued items */一些基本参数,基础的数组对象,队头的位置,队尾的位置,队列中已有的数据量
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
(2)构造器
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
队列构造方法,设置固定队列大小,是否需要公平访问队列,公平锁和非公平锁由 ReentrantLock (公平锁是正常走锁排队申请流程,非公平锁先尝试获取 AQS stat 状态锁,然后才走正常锁排队申请)提供
public ArrayBlockingQueue(int capacity) {默认构造方法是非公平的
this(capacity, false);
}
(3) add 方法:新增数据
实际上是调用继承的抽象类 AbstractQueue 的 add 方法
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
上述的 offer(e) 是接口 Queue 未实现的方法,具体实现在 ArrayBlockingQueue
public boolean offer(E e) {如果队列还没满,则加入队尾并返回 true; 可以看出来 offer 方法如果插入不了不是进入阻塞状态,是直接返回一个 false 状态
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
将数据插入队尾,移动数组下标( inc(putIndex) 保证循环移动),队列总数 count 加 1,notEmpty.signal 唤醒等待拿数据的线程(在 AQS 的等待队列中的线程)
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
(4)put 方法:新增数据 (如果满了就阻塞)
public void put(E e) throws InterruptedException {可以看到如果队列数据量 count == items.length 数组大小,则线程阻塞 await()
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
(5)poll:取数据,不是阻塞方法
public E poll() {如果数据为空则返回 null,不为空则返回数据,并且唤醒 notFull 状态挂起的线程
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
(5)take:取数据,如果为空则阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
(6) drainTo 方法:字面意思就是排干,就是把数据批量导入到一个集合类中,比一个一个 poll 效率高,因为加锁次数少
public int drainTo(Collection<? super E> c) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int n = 0;
int max = count;
while (n < max) {
c.add(this.<E>cast(items[i]));
items[i] = null;
i = inc(i);
++n;
}
if (n > 0) {
count = 0;
putIndex = 0;
takeIndex = 0;
notFull.signalAll();
}
return n;
} finally {
lock.unlock();
}
}
三. 总结 看懂 ArrayBlockingQueue 需要先看懂 AbstractQueuedSynchronizer 和 ReentrantLock,阻塞就是靠 ReentrantLock 来实现的,而 ReentrantLock 是靠 AbstractQueuedSynchronizer 来实现加锁和释放锁。主要的算法就是上文提到的 two-condition algorithm,这个算法应该在学生时代《操作系统》课程上见过很多次了