生产消费者模式(并发模式)

时间:2022-07-27 21:59:01

       在一些开发场景中,经常会遇到这样的场景,一边是产生数据段,一边是消费数据端。比如在日志系统中,为了分析消费者行为,会在网站中埋点,记录登录者的信息,比如浏览了多长时间,买个什么产品,收藏了哪些产品等一系列用户信息。这些消费者的信息经过埋点调用接口,就会产生大量的数据,可以简单的看做生产者。而消费者可以看做后续对这些数据的处理。而生产消费者模式就是为了解决以上类似场景的设计模式。刚一开始了解此模式的时候,我也带有疑问,为什么不用消费者直接调用生产者,获取信息进行处理,随着深入了解,才知道若直接调用会有以下不足点:

    1:消费者和生产者代码耦合度太高,比如随着业务变化,消费者端代码可以会变化,你就要重写消费者代码,但消费者代码一部分耦合在生产者中,又要更改生产者代码。

    2:效率不高。比如消费者调用生产者接口获取信息进行处理,因为调用时同步的,消费者要一直等待生产者产生信息返回,若生产者产生信息很慢,但消费者就要一直等待生产者产生结果。故效率不高

 为了解决以上的问题,就是把生产者和消费者的解耦,把生产者的生产的信息放在缓冲区中,消费者从缓冲区中读取数据。这样生产者和消费者就解耦开。生产者不需考虑消费者的效率,而是把信息保存在缓冲区中,留消费者消费。这个场景直接让我想到了分布式消息队列kafka,kafka就是消费者消费数据,生产者生产数据并持久化在本地。

缓冲区要使用阻塞队列,为了简单,你可以使用jdk1.5后提供的现成的阻塞队列

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

PriorityBlockingQueue :一个支持优先级排序的*阻塞队列

SynchronousQueue:一个不存储元素的阻塞队列。

当然你也可以自己实现,下面就是简单用多线程实现的生产消费者模式,代码案列

//实现一个阻塞队列缓冲区
public class BlockQt {
    private Object empty = new Object();//队列为空的对象锁
    private Object full = new Object();//队列为满的对象锁
    private Queue<Object> linkedList = new LinkedList<Object>();//队列
    private int maxLength = 10;//队列最大长度
    private int num =50;//递减生成数据
    //获取队列中的数据
    public Object take() throws InterruptedException{
         synchronized (empty) {//获取empty对象锁
            if(linkedList.size()==0){//若队列为空
                empty.wait();//获取empty对象锁的线程(消费者)阻塞
            }
            synchronized (full) {//若队列不为空,获取full对象锁
                if(linkedList.size()==maxLength){//当队列满的时候,唤醒等待full对象锁的线程生产者,因为下面有消费者消费了,队列已经不为满的了
                    full.notifyAll();//唤醒生产者
                }
                Object obj = linkedList.poll();//消费数据
                System.out.println("线程名称"+Thread.currentThread().getName()+"获取数据为:"+obj);
                return obj;
            }
        }
     }
    public void off(Object object) throws InterruptedException{
        synchronized(empty){//获取empty对象锁,
            if(linkedList.size()==0){//若linkedList为空,唤醒消费者线程,以为下面已经生产数据了
                empty.notifyAll();
            }
            synchronized (full) {//获取full对象锁
                if(linkedList.size()==maxLength){//若队列满了,生产者等待
                    full.wait();
                }
                linkedList.add(object);//生产数据
            }
        }
    }
    public int getNum() {
        return num;
    }
    public void setNum(int num) {
        this.num = num;
    }
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        BlockQ queue = new BlockQ();
        executor.execute(new Producer1(queue));
        executor.execute(new Producer1(queue));
        executor.execute(new Consumer1(queue));
        executor.execute(new Consumer1(queue));
    }
  }

//消费者
class Consumer1 implements Runnable{
     BlockQ queue;
    Consumer1(BlockQ que){
        this.queue = que;
    }
    @Override
    public void run() {
        try {
            while(true){
                Object o = queue.take();
                System.out.println(Thread.currentThread().getName()+"获取数据"+o);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
}

//生产者
class Producer1 implements Runnable{
    BlockQ queue;
    Producer1(BlockQ que){
        this.queue = que;
    }
    @Override
    public void run() {
        while(true){
            //System.out.println(queue.getNum());
            if(queue.getNum()<0){
                break;
            }
            queue.offer(queue.getNum());
            queue.setNum(queue.getNum()-1);
            System.out.println(Thread.currentThread().getName()+"插入数据"+queue.getNum());
        }
    }
    
}