# 管道实现生产者消费者模型 # # 应该特别注意管道端点的正确管理问题,如果是生产者或消费者中都没有使用管道的端点就应该将它关闭 # 这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这个步骤 # 程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道 # 后才能生成EoFEroor异常,因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。 # Pipe 存在数据不安全性 # 存在管道的一端被多个消费者进程取同一个数据的问题,出现时会报错。队列是同一时间只能有一个进程取数据。 # 解决方案,加上锁,,管道的操作加上锁后,其实就是队列了,管道比队列底层,管道走的是socket。队列是数据安全性的 # 队列相当于带锁的管道 import time import random from multiprocessing import Process, Pipe, Lock def consumer(con, pro, name, lock): ''' # 消费者一方 :param con: :param pro: :param name: :return: ''' pro.close() # 消费者不需要生产者一方的管道端点,关闭掉 while True: try: lock.acquire() # 管道操作前锁,防止多个进程争抢一个数据 food = con.recv() # 消费者用消费者的管道端点,从管道中获取数据,当管道的所有端点被关闭掉时(我认为是管道的一端被全部关闭时),会抛出异常错误EoFError lock.release() print('%s 吃了 %s ' % (name, food)) time.sleep(random.randint(1, 3)) except EOFError: print('管道抛出了异常EOFError,管道所有端点被关闭掉,说明生产者生产完了') con.close() lock.release() break def producer(con, pro, name, food): ''' 生产者一方 :param con: # 消费者一方的管道端点 :param pro: # 生产者一方的管道端点 :param name: # 谁生产 :param food: # 生产的东西 :return: ''' con.close() # 生产者不需要消费者一方的管道端点,关闭掉 for i in range(4): time.sleep(random.randint(1, 3)) f = '%s 生产了 第%s个%s' % (name, i, food) print(f) pro.send(f) # 生产者操作生产管道端点向管道中传输数据 pro.close() # 生产者生产完了后,将生产者管道端点关闭掉 if __name__ == '__main__': con, pro = Pipe() # 创建一个管道,一端给消费者用,一端给生产者用 lock = Lock() p = Process(target=producer, args=(con, pro, 'why', '泔水')) p.start() p2 = Process(target=consumer, args=(con, pro, 'fqq', lock)) p2.start() p3 = Process(target=consumer, args=(con, pro, 'fqq2', lock)) p3.start() con.close() pro.close()