1. 什么是阻塞队列
所谓的阻塞队列,也是一个队列,只不过带有阻塞性质:
-
- 如果队列为空,尝试出队列,就会阻塞等待。等待到队列不空为止
-
- 如果队列为满,尝试入队列,就会阻塞等待。等待到队列不满为止
阻塞队列线程安全!
2. Java标准库中提供的阻塞队列的使用
public class Demo15 {
public static void main(String[] args) throws InterruptedException {
BlockingDeque<String> queue = new LinkedBlockingDeque<>();
// 阻塞队列的核心方法,主要有两个
// 1,put 入队列
queue.put("hello1");
// 2. take 出队列
String result = null;
result = queue.take();
System.out.println(result);
result = queue.take();
System.out.println(result);
}
}
在上述代码中,创建了一个阻塞队列,put了一次,take了两次,此时take第二次的时候就会造成阻塞~可以看出程序并没有停止!
上述代码中是在单线程下,所以看不出来什么时候会解除阻塞,如果要接触阻塞,就需要另外一个线程在里面填充元素! 下面来通过一个案例—>生产者消费者模型
3. 生产者消费者模型
首先介绍下,什么是生产者消费者模型:
可以举一个生活中的一个例子,比如你去烧烤摊买烧烤! 老板就是生产者,你就是消费者,当老板烤好串串的时候,就会放到桌上,这个时候你和其他人就可以去拿串串吃。
此时生产者就是老板烤串,消费者就是顾客拿串吃串!生产者和消费者之间交互数据,就用要一个交易场所,就是桌子!而桌子就是一个阻塞队列!
生产者消费者模型,解决的问题有很多,下面介绍最主要的两个方面
3.1 解耦合
可以让上下模块之间,进行更好的“解耦合”
考虑以下场景:
3.2 削峰填谷
3.3 基于阻塞队列写一个生产者消费模型
public class Demo16 {
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque();
// 消费者
Thread t1 = new Thread(()->{
while (true){
try {
int value = queue.take();
System.out.println("消费元素:" + value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
// 生产者
Thread t2 = new Thread(()->{
int value = 0;
while (true){
System.out.println("生产元素" + value);
try {
queue.put(value);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
value++;
}
});
t2.start();
// 上述代码,让生产者,每隔 1s 生产一个元素
// 让消费者直接消费,不受限制
}
}
3.4 自己实现一个阻塞队列
实现一个阻塞队列分为三步
- 先实现一个普通队列
- 加上线程安全
- 加上阻塞功能
class MyBlockQueue {
private int[] items = new int[1000];
volatile private int front;
volatile private int rear;
volatile private int size;
// 入队列
synchronized public void put(int elem) throws InterruptedException {
if (size == items.length) {
// 队列满了
//return;
this.wait();
}
// 把新元素放到 tail 所在位置上
items[rear] = elem;
rear++;
// 万一tail达到末尾,就需要让tail从头再来
if (rear == items.length) {
rear = 0;
}
size++;
this.notify();
}
// 出队列
synchronized public Integer take() throws InterruptedException {
if (size == 0) {
this.wait();
}
int value = items[front];
front++;
if (front == items.length) {
front = 0;
}
size--;
this.notify();
return value;
}
}
public class Demo17 {
public static void main(String[] args) {
MyBlockQueue queue = new MyBlockQueue();
// 生产者
Thread t1 = new Thread(()->{
int value = 0;
while (true){
try {
System.out.println("生产元素:" + value);
queue.put(value);
value++;
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// 消费者
Thread t2 = new Thread(()->{
while (true){
try {
int value = 0;
value = queue.take();
System.out.println("消费元素:" + value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
但是上述代码,存在一定的风险,因为wait方法是可能会被其他方法给中断的(interrupt方法),此时如果wait 等待条件没完成,就提前被唤醒了,就不符合预期了(如果队列为空,此时就会被唤醒,执行出队操作)所以更稳妥的方法,就是在唤醒之后,再次判断一下条件,将 if 改成 while 循环
class MyBlockQueue {
private int[] items = new int[1000];
volatile private int front;
volatile private int rear;
volatile private int size;
// 入队列
synchronized public void put(int elem) throws InterruptedException {
while (size == items.length) {
// 队列满了
//return;
this.wait();
}
// 把新元素放到 tail 所在位置上
items[rear] = elem;
rear++;
// 万一tail达到末尾,就需要让tail从头再来
if (rear == items.length) {
rear = 0;
}
size++;
this.notify();
}
// 出队列
synchronized public Integer take() throws InterruptedException {
while (size == 0) {
this.wait();
}
int value = items[front];
front++;
if (front == items.length) {
front = 0;
}
size--;
this.notify();
return value;
}
}
public class Demo17 {
public static void main(String[] args) {
MyBlockQueue queue = new MyBlockQueue();
// 生产者
Thread t1 = new Thread(()->{
int value = 0;
while (true){
try {
System.out.println("生产元素:" + value);
queue.put(value);
value++;
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// 消费者
Thread t2 = new Thread(()->{
while (true){
try {
int value = 0;
value = queue.take();
System.out.println("消费元素:" + value);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
到这里就介绍介绍了,如有错误,欢迎指正~~