Java并发(基础知识)—— 阻塞队列和生产者消费者模式

时间:2021-06-22 11:21:56

1、阻塞队列                                                                                       

BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空;当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入。

BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

  抛出异常 特殊值 阻塞 超时
插入 add(e)

offer(e)

put(e) offer(e, time, unit)
移除 remove() poll() take() take(time, unit)
检查 element() peek()    

BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

JDK中提供了以下几种阻塞队列实现,分别是:

  •  ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  •  LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  •  PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
  •  DelayQueue:一个只有延时期满才能取出数据的*阻塞队列。
  •  SynchronousQueue:一个不存储元素的阻塞队列。

2、生产者消费者模式                                                                            

阻塞队列支持生产者-消费者模式,该模式将"找出需要完成的工作"与"执行工作"这两个过程分离开来,并把工作项放入一个"待完成"列表中以便在随后处理,而不是找出后立即处理。生产者-消费者模式能简化开发过程,因为他消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载管理,因为这两个过程在处理数据的速率上有所不同。

在基于阻塞队列构建的生产者-消费者设计中,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。生产者不需要知道消费者的标识或数量,或者它们是不是唯一的生产者,而只需将数据放入队列即可。同样,消费者也不需要知道生产者是谁,或者工作来自何处。BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。一种常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。

阻塞队列简化了生产者-消费者模式的编码,因为take操作会一直阻塞直到有可用的数据,如果生产者不能尽快地产生工作项使消费者保持忙碌,那么消费者就只能一直等待,直到有工作可做。同样,put操作也能简化编码,如果使用阻塞队列,那么当队列充满时,生产者将阻塞并且不能继续工作,这样消费者就有时间能够赶上生产者的速度了。

3、桌面搜索示例                                                                                 

有一种类型的程序适合被分解为生产者和消费者,例如代理程序,它将扫描本地驱动器上的文件并简历索引以便随后进行搜索,类似于某些桌面搜索程序。在以下的代码中,CrawlerThread中给出了一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。在IndexerThread中给出了一个消费者任务,即从队列中取出文件名称并对它们建立索引。

示例来自《Java并发编程实战》。

class CrawlerThread extends Thread {
private final BlockingQueue<File> fileQueue;
private final File root; public CrawlerThread(BlockingQueue<File> fileQueue, File root) {
super();
this.fileQueue = fileQueue;
this.root = root;
} public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles();
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!fileQueue.contains(entry))
fileQueue.put(entry);
}
}
}
} class IndexerThread extends Thread {
private final BlockingQueue<File> queue; public IndexerThread(BlockingQueue<File> queue) {
super();
this.queue = queue;
} public void run() {
try {
while (true) {
indexFile(queue.take());
}
} catch (InterruptedException consumed) {
Thread.currentThread().interrupt();
}
} public void indexFile(File file) {
/* ... */
};
}

  生产者-消费者模式提供了一种适合线程的方法将桌面搜索问题分解为更简单的组件,将文件遍历与建立索引等功能分解为独立的操作,比将所有功能都放到一个操作中实现有着更高的代码可读性和可重用性:每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单和清晰。

生产者-消费者模式同样能带来许多性能优势。生产者和消费者能够并发的执行,如果一个是I/O密集型,一个是CPU密集型,那么并发执行的吞吐率高于串行执行的吞吐率。如果生产者和消费者的并行度不同,那么将它们耦合在一起会把整体并行度降低为二者中更小的并行度。

以下的代码用来启动多个搜索线程跟索引线程:

	public static void main(String[] args) {
final int N_CONCUMERS = 10;
File[] roots = ...; BlockingQueue<File> queue = new LinkedBlockingQueue<File>(1000);
for(File root : roots)
new CrawlerThread(queue, root).start(); for(int i = 0; i < N_CONCUMERS; i++)
new IndexerThread(queue).start();
}