Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue

时间:2021-08-16 15:00:54

ArrayBlockingQueue继承了AbstractQueue,实现了BlockingQueue接口;

1:内部使用数组来存储队列元素

2:元素的排序是按照FIFO的顺序,队列的第一个元素是入队列时间最久的那个元素;

3:是有界队列,初始化时设置队列大小,之后不可再次设置;

4:不允许null值

5:在初始化时可设置等待该队列的被阻塞的线程唤醒的策略,fairness设置为true,按照请求该队列的顺序唤醒;默认是false也就是随机的唤醒

1:成员变量

/** The queued items */
private final E[] items;//队列元素的存放数组
/** items index for next take, poll or remove */
private int takeIndex;//出队列的元素index
/** items index for next put, offer, or add. */
private int putIndex;//如队列的元素index
/** Number of items in the queue */
private int count;

/** Main lock guarding all access */
private final ReentrantLock lock;//操作都要首先获取锁
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

2:重要方法

1:offer()方法

入队列,满了返回false,不阻塞
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
private void insert(E x) {//实际插入元素的方法,使用该方法前应该获得锁items[putIndex] = x;putIndex = inc(putIndex);++count;notEmpty.signal();}
final int inc(int i) {//循环增加index的值return (++i == items.length) ? 0 : i;}

2:put()方法

一直阻塞
public void put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); //因为在await之后,放弃了锁,被中断之后需要重新获取锁,要抛出异常了,需要唤醒其他在等待notFull的线程
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}

3:offer(time)方法

阻塞固定时间
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);//返回还需要等待的时间
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}

4:poll()方法

不阻塞,出队列
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
E x = extract();
return x;
} finally {
lock.unlock();
}
}
private E extract() {final E[] items = this.items;E x = items[takeIndex];items[takeIndex] = null;takeIndex = inc(takeIndex);--count;notFull.signal();return x;}

5:take()方法

阻塞式获取方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}

6:poll(time)方法

阻塞指定时间段
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != 0) {
E x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}

7:peek()方法

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}