【八】一文带你迅速掌握阻塞队列

时间:2021-02-20 01:14:15

1. 什么是阻塞队列

所谓的阻塞队列,也是一个队列,只不过带有阻塞性质:

    1. 如果队列为空,尝试出队列,就会阻塞等待。等待到队列不空为止
    1. 如果队列为满,尝试入队列,就会阻塞等待。等待到队列不满为止

阻塞队列线程安全!

2. Java标准库中提供的阻塞队列的使用

public class Demo15 {
    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<String> queue = new LinkedBlockingDeque<>();
        // 阻塞队列的核心方法,主要有两个
        // 1,put 入队列
        queue.put("hello1");
        // 2. take 出队列
        String result = null;
        result = queue.take();
        System.out.println(result);
        result = queue.take();
        System.out.println(result);
    }
}

【八】一文带你迅速掌握阻塞队列
在上述代码中,创建了一个阻塞队列,put了一次,take了两次,此时take第二次的时候就会造成阻塞~可以看出程序并没有停止!


上述代码中是在单线程下,所以看不出来什么时候会解除阻塞,如果要接触阻塞,就需要另外一个线程在里面填充元素! 下面来通过一个案例—>生产者消费者模型

3. 生产者消费者模型

首先介绍下,什么是生产者消费者模型:

可以举一个生活中的一个例子,比如你去烧烤摊买烧烤! 老板就是生产者,你就是消费者,当老板烤好串串的时候,就会放到桌上,这个时候你和其他人就可以去拿串串吃。
此时生产者就是老板烤串,消费者就是顾客拿串吃串!生产者和消费者之间交互数据,就用要一个交易场所,就是桌子!而桌子就是一个阻塞队列!

生产者消费者模型,解决的问题有很多,下面介绍最主要的两个方面

3.1 解耦合

可以让上下模块之间,进行更好的“解耦合”
考虑以下场景:
【八】一文带你迅速掌握阻塞队列

3.2 削峰填谷

【八】一文带你迅速掌握阻塞队列

3.3 基于阻塞队列写一个生产者消费模型

public class Demo16 {
    public static void main(String[] args) {

        BlockingDeque<Integer> queue = new LinkedBlockingDeque();
        // 消费者
        Thread t1 = new Thread(()->{
            while (true){
                try {
                    int value = queue.take();
                    System.out.println("消费元素:" + value);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();

        // 生产者
        Thread t2 = new Thread(()->{
            int value = 0;
            while (true){
                System.out.println("生产元素" + value);
                try {
                    queue.put(value);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                value++;
            }
        });
        t2.start();
        
        // 上述代码,让生产者,每隔 1s 生产一个元素
        // 让消费者直接消费,不受限制
    }
}

【八】一文带你迅速掌握阻塞队列

3.4 自己实现一个阻塞队列

实现一个阻塞队列分为三步

  • 先实现一个普通队列
  • 加上线程安全
  • 加上阻塞功能
class MyBlockQueue {

    private int[] items = new int[1000];
    volatile private int front;
    volatile private int rear;
    volatile private int size;

    // 入队列
    synchronized public void put(int elem) throws InterruptedException {
        if (size == items.length) {
            // 队列满了
            //return;
            this.wait();
        }
        // 把新元素放到 tail 所在位置上
        items[rear] = elem;
        rear++;
        // 万一tail达到末尾,就需要让tail从头再来
        if (rear == items.length) {
            rear = 0;
        }
        size++;
        this.notify();
    }


    // 出队列
    synchronized public Integer take() throws InterruptedException {
        if (size == 0) {
            this.wait();
        }
        int value = items[front];
        front++;
        if (front == items.length) {
            front = 0;
        }
        size--;
        this.notify();
        return value;
    }
}

public class Demo17 {
    public static void main(String[] args) {
        MyBlockQueue queue = new MyBlockQueue();
        // 生产者
        Thread t1 = new Thread(()->{
            int value = 0;
            while (true){
                try {
                    System.out.println("生产元素:" + value);
                    queue.put(value);
                    value++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        // 消费者
        Thread t2  = new Thread(()->{
            while (true){
                try {
                    int value = 0;
                    value = queue.take();
                    System.out.println("消费元素:" + value);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();

    }
}

【八】一文带你迅速掌握阻塞队列

但是上述代码,存在一定的风险,因为wait方法是可能会被其他方法给中断的(interrupt方法),此时如果wait 等待条件没完成,就提前被唤醒了,就不符合预期了(如果队列为空,此时就会被唤醒,执行出队操作)所以更稳妥的方法,就是在唤醒之后,再次判断一下条件,将 if 改成 while 循环

class MyBlockQueue {

    private int[] items = new int[1000];
    volatile private int front;
    volatile private int rear;
    volatile private int size;

    // 入队列
    synchronized public void put(int elem) throws InterruptedException {
        while (size == items.length) {
            // 队列满了
            //return;
            this.wait();
        }
        // 把新元素放到 tail 所在位置上
        items[rear] = elem;
        rear++;
        // 万一tail达到末尾,就需要让tail从头再来
        if (rear == items.length) {
            rear = 0;
        }
        size++;
        this.notify();
    }


    // 出队列
    synchronized public Integer take() throws InterruptedException {
        while (size == 0) {
            this.wait();
        }
        int value = items[front];
        front++;
        if (front == items.length) {
            front = 0;
        }
        size--;
        this.notify();
        return value;
    }
}

public class Demo17 {
    public static void main(String[] args) {
        MyBlockQueue queue = new MyBlockQueue();
        // 生产者
        Thread t1 = new Thread(()->{
            int value = 0;
            while (true){
                try {
                    System.out.println("生产元素:" + value);
                    queue.put(value);
                    value++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        // 消费者
        Thread t2  = new Thread(()->{
            while (true){
                try {
                    int value = 0;
                    value = queue.take();
                    System.out.println("消费元素:" + value);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();

    }
}

到这里就介绍介绍了,如有错误,欢迎指正~~