Java并发编程-阻塞队列

时间:2022-03-31 20:23:32

Java concurrent 包中BlockingQueue接口有ArrayBlockingqueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousBlockingQueue等几个实现类。

1、ArrayBlockingqueue

通过名字可以看出,ArrayBlockingqueue是通过数组实现的定长队列,遵循FIFO(先进先出)原则,队列头部元素位于队列中的时间最长,队列尾部的元素位于队列中的时间最短。新到的元素放入队列尾部,取元素时从头部取。

队列一旦初始化后,大小就确定了。队列满了后再向其中添加元素就会阻塞队列。同理,向空的队列取元素也会发生阻塞。

2、LinkedBlockingQueue

该队列自然是通过链表实现的,与ArrayBlockingQueue不同的是,该队列是基于链表节点实现,节点动态生成,队列默认最大长度是整数型变量最大值,即231-1,可以通过构造方法自己指定队列的长度。由于LinkedBlockingQueue生产和消费数据分别采用独立的锁,所以在高并发情况下,其吞吐量要高于ArrayBlockingQueue,但在大多数应用中,其可预知的性能要低。

3、PriorityBlockingQueue

该队列中的元素是按顺序结构存储中,添加到队列中的元素必须实现Comparable接口的compareTo()方法。在向队列中添加元素时,会与原队列中的元素进行比较,直到找到它的位置。该队列不允许null值。

4、SynchronousBlockingQueue

对于SynchronousBlockingQueue来说,入队和出队操作是交替进行的,也就是说一个线程进行insert操作后,必须等待另一个线程进行remove操作才能再次进行insert操作,反之亦然。

下面是一个用LinkedBlockingQueue关于生产者和消费者一个例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; public class BlockingQueue {
private static LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<Integer>(); public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Producer("producer 1"));
executorService.submit(new Producer("producer 2"));
executorService.submit(new Producer("producer 3")); executorService.submit(new Consumer("consumer 1"));
executorService.submit(new Consumer("consumer 2"));
executorService.submit(new Consumer("consumer 3")); } static class Producer implements Runnable{
private String name;
public Producer(String name){
this.name = name;
} @Override
public void run() {
for(Integer i=0;i<10;i++){
System.out.println(this.name + " produce " + i);
linkedBlockingQueue.add(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} static class Consumer implements Runnable{
private String name;
public Consumer(String name){
this.name=name;
}
@Override
public void run() {
for(Integer i=0;i<10;i++){
try {
System.out.println(this.name + " consume " + linkedBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} }