并发编程(三)--生产者和消费者模型

时间:2021-06-29 18:00:13

一、生产者与消费者模型

1、什么是生产者消费者模型

(1)生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

(2)实现方式:
  生产者---->队列<------消费者

2、为什么要使用生产者和消费者模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式,平衡了生产者和消费者的处理能力。

3、实现生产者和消费者模型

(1)模型的实现

#程序中有两类角色
     一类负责生产数据(生产者)
     一类负责处理数据(消费者)
        
#引入生产者消费者模型为了解决的问题是:
     平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
        
#如何实现:
       生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和
并发编程(三)--生产者和消费者模型并发编程(三)--生产者和消费者模型
from multiprocess import ,Process,Queue
import time,random

def producer(food,name,q):
    for i in range(10):
        res='%s %s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生产 %s'(name,res))

def consumer(name,q):
    while True:
        food=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃掉 %s'%(name,food))


if __name__ == "__main__":
    q=Queue()
    p1=Process(target=producer,args=('苹果','生产者1',q))
    p2=Process(target=producer,args=('橘子','生产者2',q))   
    p3=Process(target=producer,args=('桃子','生产者3',q)) 
    c1=Process(target=consumer,args=('吃货1',q))
    c2=Process(target=consumer,args=('吃货2',q))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
生产者消费者模型

(2)存在问题

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

===>解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

并发编程(三)--生产者和消费者模型并发编程(三)--生产者和消费者模型
import time, random
from multiprocessing import Process, Queue


def prodecer(food, name, q):
    for i in range(3):
        res = '%s %s' % (food, i)
        time.sleep(random.randint(1, 3))
        print('%s 生产 %s' % (name, res))
        q.put(res)


def consumer(name, q):
    while True:
        res = q.get()
        if res is None:
            break              # 一旦消费者,取到这个信号,就退出
        time.sleep(random.randint(1, 3))
        print("%s 吃 %s" % (name, res))


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=prodecer, args=('包子', 'qqq', q))
    p2 = Process(target=prodecer, args=('', 'www', q))
    p3 = Process(target=prodecer, args=('苹果', 'eee', q))
    c1 = Process(target=consumer, args=('J', q))
    c2 = Process(target=consumer, args=('k', q))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()    # 保证所有生产者生产完东西
    q.put(None)      #所有生产者都生产完,在队列的最后面加上None信号
    q.put(None)     #给第二个消费者的信号
    print('')
解决方式一

===>队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的

并发编程(三)--生产者和消费者模型并发编程(三)--生产者和消费者模型
import time, random
from multiprocessing import Process, JoinableQueue


def prodecer(food, name, q):
    for i in range(3):
        res = '%s %s' % (food, i)
        time.sleep(random.randint(1, 3))
        print('%s 生产 %s' % (name, res))
        q.put(res)


def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print("%s 消费 %s" % (name, res))
        q.task_done()            ##向q.join()发送一次信号,证明一个数据已经被取走了



if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=prodecer, args=('包子', 'qqq', q))
    p2 = Process(target=prodecer, args=('shui', 'www', q))
    p3 = Process(target=prodecer, args=('apple', 'eee', q))
    c1 = Process(target=consumer, args=('J', q))
    c2 = Process(target=consumer, args=('k', q))

    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.join()       # 队列取空
    print('')    
解决方式二