Python 35 进程间的通信(IPC机制)、生产者消费者模型

时间:2025-03-04 14:04:50

一:进程间的通信(IPC):先进先出  管道:队列=管道+锁

from multiprocessing import Queue
q=Queue(4)
q.put(['first',],block=True,timeout=3)
q.put({'x':2},block=True,timeout=3)
q.put(3,block=True,timeout=3)
q.put(4,block=True,timeout=3)
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
from multiprocessing import Queue #Queued队列模块

q=Queue()

print(q.get())
print(q.get())
print(q.get())
print(q.get()) q.put(['first'],block=True,timeout=3)
q.put({'x':2},block=True,timeout=3)
q.put(3,block=True,timeout=3) print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3)) print(q.get_nowait()) #q.get(block=false)
print(q.get_nowait()) #q.get(block=false)
print(q.get_nowait()) #q.get(block=false)
print(q.get_nowait()) #q.get(block=false)

二:生产者消费者模型

1. 什么是生产者消费者模型
    生产者:代指生产数据的任务
    消费者:代指处理数据的任务
    该模型的工作方式:生产生产数据传递消费者处理
    实现方式: 生产者---->队列<------消费者 2. 为什么要用
    当程序中出现明细的两类任务,一类负责生产数据,一类负责处理数据,
    就可以引入生产者消费者模型来实现生产者与消费者的解耦合,平衡生产能力与消费能力,从提升效率 3. 如何用
import time, random
from multiprocessing import Process, Queue def produer(name, food, q):
for i in range(3): # 生产的数量
res = '%s%s' % (food, i)
# 造数据
time.sleep(random.randint(1, 3)) # 模拟生产数据的时间
q.put(res)
print('厨师【%s】做了<%s>' % (name, res)) def consumer(name, q):
while True:
res = q.get()
if res is None: break # 生产者生产完后break退出
time.sleep(random.randint(1, 3)) # 模拟处理数据的时间
print('吃货【%s】吃了<%s>' % (name, res)) if __name__ == '__main__':
q = Queue()
# 生产者们
p1 = Process(target=produer, args=('小混世魔王', '包子', q))
p2 = Process(target=produer, args=('中混世魔王', '馒头', q))
p3 = Process(target=produer, args=('大混世魔王', '鸡蛋', q)) # 消费者们
c1 = Process(target=consumer, args=('扒皮', q))
c2 = Process(target=consumer, args=('钢牙', q)) p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
print('主')

生产者消费者模型

存在问题

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

===>解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

import time, random
from multiprocessing import Process, Queue def produer(name, food, q):
for i in range(3): # 生产的数量
res = '%s%s' % (food, i)
# 造数据
time.sleep(random.randint(1, 3)) # 模拟生产数据的时间
q.put(res)
print('厨师【%s】做了<%s>' % (name, res)) def consumer(name, q):
while True:
res = q.get()
if res is None: break # 生产者生产完后break退出
time.sleep(random.randint(1, 3)) # 模拟处理数据的时间
print('吃货【%s】吃了<%s>' % (name, res)) if __name__ == '__main__':
q = Queue()
# 生产者们
p1 = Process(target=produer, args=('小混世魔王', '包子', q))
p2 = Process(target=produer, args=('中混世魔王', '馒头', q))
p3 = Process(target=produer, args=('大混世魔王', '鸡蛋', q)) # 消费者们
c1 = Process(target=consumer, args=('扒皮', q))
c2 = Process(target=consumer, args=('钢牙', q)) p1.start()
p2.start()
p3.start()
c1.start()
c2.start() p1.join()
p2.join()
p3.join() # 保证所有生产者生产完东西
q.put(None) # 所有生产者都生产完,在队列的最后加上None信号
q.put(None) # 给第二个消费者的信号
print('主')

解决方案一:

===>队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的

import time, random
from multiprocessing import Process, JoinableQueue def produer(name, food, q):
for i in range(3): # 生产的数量
res = '%s%s' % (food, i)
# 造数据
time.sleep(random.randint(1, 3)) # 模拟生产数据的时间
q.put(res)
print('厨师【%s】做了<%s>' % (name, res)) def consumer(name, q):
while True:
res = q.get()
time.sleep(random.randint(1, 3)) # 模拟处理数据的时间
print('吃货【%s】吃了<%s>' % (name, res))
q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了 if __name__ == '__main__':
q = JoinableQueue()
# 生产者们
p1 = Process(target=produer, args=('小混世魔王', '包子', q))
p2 = Process(target=produer, args=('中混世魔王', '馒头', q))
p3 = Process(target=produer, args=('大混世魔王', '鸡蛋', q)) # 消费者们
c1 = Process(target=consumer, args=('扒皮', q))
c2 = Process(target=consumer, args=('钢牙', q)) p1.start()
p2.start()
p3.start()
c1.start()
c2.start() p1.join()
p2.join()
p3.join() # 队列取空
print('主')

解决方案二: