python 进程队列实现生产者消费者模型

时间:2020-12-07 17:37:59

一. 队列

  引入队列 : from multiprocess import Queue,JoinableQueue(继承自Queue,所以可以使用Queue的方法)

  创建队列 : Queue( maxsize )  #maxsize为队列允许的最大对列数,省略则不限制大小

  实例化队列的具体使用方法 :

    q = Queue()

    q.get(  ,biock , timeout   ) : 取出队列中的某一项.如果队列为空,此方法将阻塞,知道队列中有东西可取.block用于控制阻塞行为,默认为True,如果设置为False,将不会等待,直接引发Queue.Empty异常. timeout是可选超时时间,用于阻塞,如果在设定的时间间隔内队列内还是没有东西可取,将引发Queue.Empty异常.

    q.get_nowait() : 如果队列为空,不阻塞等待,直接引发Queue.Empty异常

    q.put(  item , block ,timeout ) : 将item放入队列.如果队列已满,此方法将阻塞,直到队列中有空间放下item. block控制阻塞,默认为True,如果设置为False,将引发Queue.Full异常. timeout指定在阻塞模式中等待可用空间的时间长短,超时后将引发Queue.Full异常.

    q.put_nowait() : 如果对列已满,不阻塞等待,直接引发Queue.Full异常.

    q.qsize() : 返回队列目前项目的正确数量. 此函数的结果不可靠,因为再返回结果和在稍后程序中使用结果之间,对列可能添加或删除了某一个项目.

    q.empty() : 判断对列是否为空,如果空返回True. 判断结果不可靠,在返回和使用结果之间,对列就有可能已经加入了新的项目.

    q.full() : 判断对列是否已满,如果已满返回True,结果不可靠,理由同上.

    q.close() : 关闭队列,防止对列加入更多的数据.关闭队列不会给队列使用者返回任何数据结束信号或异常.

  实例化队列具体用法 :

    j = JoinableQueue( maxsize )  #maxsize为队列允许的最大对列数,省略则不限制大小

    j.task_done() : 用于消费者,标识每取出队列中一个数据,就会给join一个标识

    j.join() : 生产者使用此方法进行阻塞,直到队列中所有数据均被处理完成.相当于一个计数器,消费者没消费一个数据,join就会接收task_done发来的一个标识,直到消费者返回所有标识时,join就知道所有数据已经全部被消费掉了.

   

二. 生产者消费者模型(队列)

  主要是为了消除耦合性

  即供需不平衡问题. 此模型将生产者生产的产品放到一个独立的空间中,消费者在空间内取产品消费,这样生产者和消费者就没有了直接的供求关系.

  生产者消费者模型代码一(单个生产者消费者) : 

from multiprocessing import Process,Queue

def xiaofei(q,name):
    while 1:
        x = q.get()#取出队列中的内容
        if x:#判断取出的额内容是不是空,不是空打印
            print('%s消费了%s'%(name,x))
        else:#取出的内容是空,直接退出循环
            break
def shengchan(q,name):
    for i in range(10):
        p = '%s 生产了xx%s号'%(name,i)
        q.put(p)
        print(p)
    q.put(None)#当所有产品都被生产完之后给队列传一个空,用于取出时判断
if __name__ == '__main__':
    q = Queue(6)#实例化队列,并且设置队列最大长度为6
    s = Process(target=shengchan , args=(q,'哈哈'))#开启生产者进程
    s.start()
    x = Process(target=xiaofei , args=(q,''))#开启消费者进程
    x.start()

  生产者消费者模型2(多个生产者消费者,将生产结束标识放在父进程) :

from multiprocessing import Queue,Process

def xiaofei(q,name):
    while 1:
        x = q.get()
        if x:
            print('%s使用了%s'% (name,x))
        else:
            break

def shengchan(q,name):
    for i in range(10):
        p = '%s生产了xx%s号' %(name,i)
        q.put(p)

if __name__ == '__main__':
    q = Queue()
    s = Process(target=shengchan, args=(q,''))
    s1 = Process(target=shengchan, args=(q,''))
    x = Process(target=xiaofei , args=(q,''))
    x1 = Process(target=xiaofei , args=(q,''))
    s.start()
    s1.start()
    x.start()
    x1.start()
    s.join()
    s1.join()
    q.put(None)
    q.put(None)

  生产者消费者模型3(JoinableQueue模块) :

from multiprocessing import JoinableQueue,Process
def xiaofei(j,name):
    while 1:
        j1 = j.get()
        print('%s使用了%s'% (name,j1))
        j.task_done()#消费者没消费一次,便标识一下返回给生产者join,用于计数
def shengchan(j,name):
    for i in range(10):
        p = '%s 生产了娃娃 %s 号'%(name,i)
        j.put(p)
    j.join()#记录下生产者生产的总数.接收task_done返回的标识以计算消费者消费的产品数量,直到消费进程全部将产品消费完

if __name__ == '__main__':
    j = JoinableQueue()
    s = Process(target=shengchan,args=(j,''))
    x = Process(target=xiaofei,args=(j,''))
    x.daemon = True#将消费方法设置为守护进程,用来结束进程
    s.start()
    x.start()
    s.join()#将生产进程设置为何主进程同步,等待生产进程结束后主进程结束