BlockingQueue是什么
- BlockingQueue是一个阻塞队列的接口
- BlockingQueue是线程安全的
- BlockingQueue具有先进先出的特点
- 当队列满的时候进行入队操作会阻塞,当队列空的时候进行出队操作会阻塞
BlockingQueue提供的接口
BlockingQueue提供的接口有四种不同的方法,具体如下表所示
操作 | Throws Exception | Special Value | Blocks | Times Out |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
删除 | remove(o) | poll() | take() | poll(timeout, timeunit) |
查询 | element() | peek() | - | - |
这四种不同的方法对应的特点分别是
- ThrowsException:如果操作不能马上进行,则抛出异常
- SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
- Blocks:如果操作不能马上进行,操作会被阻塞
- TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值
ArrayBlockingQueue是什么
- ArrayBlockingQueue是一个基于数组且有界的阻塞队列,即容量固定
- ArrayBlockingQueue是线程安全的
- ArrayBlockingQueuee具有先进先出的特点
- 当队列满的时候进行入队(put)操作会阻塞,当队列空的时候进行出队(take)操作会阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
ArrayBlockingQueue成员变量
final Object[] items;
int takeIndex;
int putIndex;
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
由于ArrayBlockingQueue是基于数组且右界的阻塞队列,因此需要个数组进行存储,需要两个指针索引记录队列的头地址和尾地址。其中有个锁是为了保证线程安全的,而两个Condition则是用来阻塞队列的
ArrayBlockingQueue构造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
构造函数的参数
- capacity:指定容量的大小
- fair:指定是否使用公平锁,所谓的公平锁就是先等待的线程先获得锁
ArrayBlockingQueue的存储
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获取锁
try {
while (count == items.length)//如果已满,等待
notFull.await();
enqueue(e);//如果没满,入队
} finally {
lock.unlock();//释放锁
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();//通知有人入队,notEmpty表示队列不会为空
}
存储的逻辑还是挺简单的
- 通过对put方法加锁保证线程的安全
- 判断队列的情况,如果满了则等待,没满则入队
- 入队后,通知队列的notEmpty,表示队列不会为空
ArrayBlockingQueue的获取
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获取锁
try {
while (count == 0)//如果没元素,等待
notEmpty.await();
return dequeue();//如果有元素,出队
} finally {
lock.unlock();//释放锁
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//通知有人出队,notFull表示队列不会溢满
return x;
}
获取的逻辑基本和存储的逻辑反过来
- 通过对take方法加锁保证线程的安全
- 判断队列的情况,如果没元素则等待,如果有元素则出队
- 出队后,通知队列的notFull,表示队列不会溢满
这里穿插下ReentrantLock的lock和lockInterruptibly的区别
- 当线程有可能被其他线程中断时,lock方法会忽略中断请求,继续获取锁直到成功
- 当线程有可能被其他线程中断时,lockInterruptibly方法则直接抛出中断异常来立即响应中断
总结
- ArrayBlockingQueue队列是基于数组和Condition类来实现的
- ArrayBlockingQueue的存储和获取采用生产消费模式
- ArrayBlockingQueue的线程安全是通过ReentrantLock来保证的
- ArrayBlockingQueue的队列中不允许元素为null