
1.由于进程之间内存隔离,那么要修改共享数据时可以利用IPC机制
我们利用队列去处理相应数据
#管道 #队列=管道+锁 from multiprocessing import Queue # q=Queue(3) # q.put(['first',]) # q.put({'x':2}) # q.put(3) # q.put(4)#当队列满了,放不进去了,会阻塞住 # print(q.get()) # print(q.get()) # print(q.get()) # print(q.get())#当取不到值了。又阻塞住了,等待拿数据 q=Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 了解: # q=Queue(3) # q.put(['first',],block=True,timeout=3) # q.put({'x':2},block=True,timeout=3) # q.put(3,block=True,timeout=3) # q.put(4,block=True,timeout=3) # q.put_nowait(1) #q.put(1,block=False) # q.put_nowait(2) # q.put_nowait(3) # q.put_nowait(4) # print(q.get(block=True,timeout=3)) # print(q.get(block=True,timeout=3)) # print(q.get(block=True,timeout=3)) # print(q.get(block=True,timeout=3)) # print(q.get_nowait()) #q.get(block=false) # print(q.get_nowait()) #q.get(block=false) # print(q.get_nowait()) #q.get(block=false) # print(q.get_nowait()) #q.get(block=false)
2.生产者消费者模型
当程序中出现明细的两类任务,一类负责生产数据,一类负责处理数据,就可以引入生产者消费者模型来实现生产者与消费者的解耦合,平衡生产能力与消费能力,从而提升效率
mport time,random from multiprocessing import Process,JoinableQueue def producer(name,food,q): for i in range(3): res='%s%s' %(food,i) time.sleep(random.randint(1,3)) #模拟生产数据的时间 q.put(res) print('厨师[%s]生产了<%s>' %(name,res)) def consumer(name,q): while True: res=q.get() time.sleep(random.randint(1,3)) #模拟处理数据的时间 print('吃货[%s]吃了<%s>' %(name,res)) q.task_done() if __name__ == '__main__': q=JoinableQueue() # 生产者们 p1=Process(target=producer,args=('小Egon','泔水',q)) p2=Process(target=producer,args=('中Egon','屎包子',q)) p3=Process(target=producer,args=('大Egon','腰子汤',q)) # 消费者们 c1=Process(target=consumer,args=('刘清正',q)) c2=Process(target=consumer,args=('吴三江',q)) c1.daemon=True c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.join() # 主进程等q结束,即q内数据被取干净了 print('主')
这里注意
JoinableQueue:这就像一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
方法介绍:
JoinableQueue的实例q除了与Queue对象相同的方法之外还具有:
q.task_done():消费者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发异常。
q.join():生产者调用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止。
import time,random from multiprocessing import Process,JoinableQueue def producer(name,food,q): for i in range(3): res='%s%s' %(food,i) time.sleep(random.randint(1,3)) #模拟生产数据的时间 q.put(res) print('厨师[%s]生产了<%s>' %(name,res)) def consumer(name,q): while True: res=q.get() time.sleep(random.randint(1,3)) #模拟处理数据的时间 print('吃货[%s]吃了<%s>' %(name,res)) q.task_done() if __name__ == '__main__': q=JoinableQueue() # 生产者们 p1=Process(target=producer,args=('小Egon','泔水',q)) p2=Process(target=producer,args=('中Egon','屎包子',q)) p3=Process(target=producer,args=('大Egon','腰子汤',q)) # 消费者们 c1=Process(target=consumer,args=('刘清正',q)) c2=Process(target=consumer,args=('吴三江',q)) c1.daemon=True c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.join() # 主进程等q结束,即q内数据被取干净了 print('主')