生产者消费者模型

时间:2023-02-23 15:15:09

什么是生产者消费者模型

我们可以把这个模型想象成工厂里的两条流水线,我们管他们叫生产者流水线和消费者流水线,生产者流水线生产出来的产品给消费者流水线使用,其中生产者流水线先把生产出来的产品放在仓库,然后消费者流水线再去仓库拿。这个仓库就叫做阻塞队列。
那么,这个仓库的实现有什么要求呢?

  • 第一,仓库满的时候,不能继续往里放了,生产者流水线要停止生产。
  • 第二,仓库空的时候,拿不出来了,消费者流水线要停止取商品。
  • 第三,生产者流水线和消费者流水线都可以使用仓库。也就是说仓库是共享变量,要注意线程安全。

下面我们先来设计一下这个仓库(阻塞队列):
关于阻塞队列的设计,有几点需要我们思考:

  • ①为什么要给阻塞队列容量的限制?如果生产者生成的效率比消费者消费的效率快,那么生产者产生的“产品”会不断的在队列中累积起来,最终耗尽内存。如果使用有界队列,那么当队列满时,生产者会阻塞并且不能继续工作,而消费者可以赶上工作进度。
  • ②如何保证阻塞队列的线程安全?由于生产者和消费者都是对阻塞队列进行修改操作,所以我们既需要保证操作的可见性又需要保证原子性,实现的方法有很多种:一种是将阻塞队列设计成线程安全的类,另一种是将生产者和消费者对阻塞队列的操作设计成原子操作。
  • ③阻塞队列是如何实现阻塞的?wait/notify 或者 await/signal都可以实现阻塞与唤醒操作。

生产者消费者模型的代码实现

将阻塞队列设计成线程安全的类

import java.util.LinkedList;
import java.util.Queue;

public class BlockQueueplus {
    Queue<Integer> queue = new LinkedList();
    int capacity ; //阻塞队列的容量

    public BlockQueueplus(int capacity) {
        this.capacity = capacity;

    }

    /**
     * 将数据放入阻塞队列中
     * @param i 放入的元素
     * @throws InterruptedException
     */
    public synchronized void put(Integer i) throws InterruptedException {
        while(capacity <= queue.size()){
            wait();
        }
        queue.offer(i);
        System.out.println(Thread.currentThread().getName() + "生产了value, value的当前值是" + i );
        notify();
    }

    /**
     * 从阻塞队列中取出数据
     * @return
     * @throws InterruptedException
     */
    public synchronized int take() throws InterruptedException {
        if(size() == 0){
            wait();
        }
        Integer result = queue.poll();
        System.out.println(Thread.currentThread().getName() + "消费了value, value的当前值是" + result );
        notify();
        return result;
    }

    public int size(){
        return queue.size();
    }

    public Boolean isEmpty(){
        return queue.isEmpty();
    }

    public Boolean isFull(){
        return this.size()==this.capacity;
    }

}

synchronized关键字用在方法上的作用

有的同学可能不懂synchronized关键字用在方法上有什么作用,我来讲解一下:
在我的BlockQueueplus类中,put()和take()都添加了synchronized关键字,当进入synchronized修饰的方法时,锁住的是当前实例类,所以当调用put()方式时,put()方法拿到了当前实例的锁,take()想运行也需要拿到锁,就要等待put()方法运行完释放锁。所以就实现了put操作和take操作的互斥。

wait和notify的使用

首先呢,Java 中每个对象都有一把称之为 monitor 监视器的锁,调用synchronized方法时,会获取monitor锁。当调用wait方法时,会释放monitor锁。在我的BlockQueueplus类中,当队列满时,put方法会调用wait方法,进入阻塞,此时take方法就可以获取锁,运行取操作。take操作在执行完取操作之后,会调用notify()方法,通知一个正在wait阻塞中的线程让它继续运行。

生产者线程与消费者线程

用Work把阻塞队列封装一下,只提供插入和取两种方法。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Work{
    private static BlockQueueplus blockQueueplus = new BlockQueueplus(100);
    public void set(int i)
    {
            try {
                blockQueueplus.put(i);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

    public void get() 
    {
        try {
            Integer i = blockQueueplus.take();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

测试类

开启两个线程,生产者不停的往阻塞队列中插入数据,消费者不停的从阻塞队列中取数据

class WorkTest {
    public static void main(String[] args) {
        Work work = new Work();
        Runnable producerRunnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < Integer.MAX_VALUE; i++)
                    work.set(i);
            }
        };
        Runnable customerRunnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < Integer.MAX_VALUE; i++)
                    work.get();
            }
        };
        Thread ProducerThread = new Thread(producerRunnable);
        ProducerThread.setName("Producer");
        Thread ConsumerThread = new Thread(customerRunnable);
        ConsumerThread.setName("Consumer");
        ProducerThread.start();
        ConsumerThread.start();
    }
}

运行结果:
生产者消费者模型

从运行结果中可以发现,生产者线程与消费者线程交接的时候,他们生产的数和消费的数的差正好为99,也证明了阻塞队列设计的成功。

还可以改进的地方

生产者线程可能是多个,消费者线程也可以是多个,如果继续使用暴力的wait和notify,就有可能会出现生产者A唤醒生产者B的错误,我们可以尝试使用await和signal来优雅的唤醒需要唤醒的线程。