《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

时间:2021-01-07 09:14:27

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现。所以分析ArrayBlockingQueue也是基于这两点。

对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的。

首先来看offer方法

    public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}

通过代码可以看出是通过采用Lock的方式来获取锁,然后再进行插入操作,最后再释放锁。

因此对于poll方法来说实现的方法肯定也是大同小异

    public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}

说过了线程安全的实现,接下来说说阻塞是如何实现的。如果各位知道Object的wait/notify的话就很好理解了。这里涉及到一个接口叫java.util.concurrent.locks.Condition。

Condition拥有类似的操作:await/signal。Condition和一个Lock相关,由Lock的newCondition来创建。只有当前线程获取了这把锁,才能调用Condition的await方法来等待通知,否则会抛出异常。

下面来看看put方法就会明白如何使用一个Condition了

notFull =  lock.newCondition();
    public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}

实现阻塞的关键就是就是这个notFull的Condition,当队列已满,await方法会阻塞当前线程,并且释放Lock,等待其他线程调用notFull的signal来唤醒这个阻塞的线程。那么这个操作必然会在拿走元素的操作中出现,这样一旦有元素被拿走,阻塞的线程就会被唤醒。

这里有个问题,发出signal的线程肯定拥有这把锁的,因此await方法所在的线程肯定是拿不到这把锁的,await方法不能立刻返回,需要尝试获取锁直到拥有了锁才可以从await方法中返回。

这就是阻塞的实现原理,也是所谓的线程同步。

同样对于take方法会有一个notEmpty的Condition。

    public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

需要注意的是这里返回队列长度的时候也是需要锁的

    public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

ArrayBlockingQueue的实现相对简单,只需要一把锁就可以搞定,下一篇关于LinkedBlockingQueue则会复杂不少,需要用到两把锁。