1. 原理概述
- 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
- 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
- 如图,负责生产的线程不断的制造新对象并插入到阻塞队列中,直到达到这个队列的上限值。队列达到上限值之后生产线程将会被阻塞,直到消费的线程对这个队列进行消费。同理,负责消费的线程不断的从队列中消费对象,直到这个队列为空,当队列为空时,消费线程将会被阻塞,除非队列中有新的对象被插入。
2. 核心方法
(1) 插入数据
-
offer(anObject)
表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程) -
offer(E o, long timeout, TimeUnit unit)
可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。 -
put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞,直到BlockingQueue里面有空间再继续。
Offer和put的区别是,前者如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false,并不阻塞;后者如果操作不能马上进行,操作会被阻塞。
(2) 获取数据
-
poll()
取走BlockingQueue里排在首位的对象,若能则立即取出, 取不到时返回null(不会被阻塞)。 -
poll(long timeout, TimeUnit unit)
从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。 -
take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻塞进入等待状态直到 BlockingQueue有新的数据被加入。 -
drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
3. 阻塞队列种类
ArrayBlockingQueue
ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue是以先进先出的方式存储数据(FIFO),最新插入的对象是尾部,最新移出的对象是头部。LinkedBlockingQueue
LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
ArrayBlockingQueue与ArrayList相比,LinkedBlockingQueue与LinkedList相比,都十分相似,但是前者比后者具有更好的性能。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。PriorityBlockingQueue
PriorityBlockingQueue是一个没有边界的、基于优先级的阻塞队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。
所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。SynchronousQueue
SynchronousQueue是无缓冲的队列,当一个线程往队列里插入一个元素后会被阻塞,除非这个元素被另一个线程消费。
4. 实例
- 生产者与消费者模型
// 生产者
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is making product...");
String product = "made by " + Thread.currentThread().getName();
try {
queue.put(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 消费者
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
String product = queue.take();
System.out.println(Thread.currentThread().getName() + " is consuming product " + product + "...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 客户端
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Client {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
for (int i = 0; i < 2; i++) {
new Thread(new Consumer(queue), "Producer" + i).start();
}
for (int i = 0; i < 5; i++) {
// 只有两个 Product,因此只能消费两个,其它三个消费者被阻塞
new Thread(new Producer(queue), "Consumer" + i).start();
}
for (int i = 2; i < 5; i++) {
new Thread(new Consumer(queue), "Producer" + i).start();
}
}
}
// 运行结果
Consumer0 is making product...
Producer0 is consuming product made by Consumer0...
Consumer1 is making product...
Producer1 is consuming product made by Consumer1...
Consumer2 is making product...
Consumer3 is making product...
Consumer4 is making product...
Producer2 is consuming product made by Consumer2...
Producer3 is consuming product made by Consumer3...
Producer4 is consuming product made by Consumer4...