管道实现生产者消费者模型

时间:2021-09-01 17:41:52
# 管道实现生产者消费者模型
    # # 应该特别注意管道端点的正确管理问题,如果是生产者或消费者中都没有使用管道的端点就应该将它关闭
    # 这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这个步骤
    # 程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道
    # 后才能生成EoFEroor异常,因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

# Pipe 存在数据不安全性
    # 存在管道的一端被多个消费者进程取同一个数据的问题,出现时会报错。队列是同一时间只能有一个进程取数据。
    # 解决方案,加上锁,,管道的操作加上锁后,其实就是队列了,管道比队列底层,管道走的是socket。队列是数据安全性的

# 队列相当于带锁的管道

import time
import random
from multiprocessing import Process, Pipe, Lock


def consumer(con, pro, name, lock):
    '''
    # 消费者一方
    :param con:
    :param pro:
    :param name:
    :return:
    '''

    pro.close() # 消费者不需要生产者一方的管道端点,关闭掉
    while True:
        try:
            lock.acquire()  # 管道操作前锁,防止多个进程争抢一个数据
            food = con.recv()   # 消费者用消费者的管道端点,从管道中获取数据,当管道的所有端点被关闭掉时(我认为是管道的一端被全部关闭时),会抛出异常错误EoFError
            lock.release()
            print('%s 吃了 %s ' % (name, food))
            time.sleep(random.randint(1, 3))
        except EOFError:
            print('管道抛出了异常EOFError,管道所有端点被关闭掉,说明生产者生产完了')
            con.close()
            lock.release()
            break


def producer(con, pro, name, food):
    '''
    生产者一方
    :param con: # 消费者一方的管道端点
    :param pro: # 生产者一方的管道端点
    :param name:    # 谁生产
    :param food:    # 生产的东西
    :return:
    '''
    con.close() # 生产者不需要消费者一方的管道端点,关闭掉
    for i in range(4):
        time.sleep(random.randint(1, 3))
        f = '%s 生产了 第%s个%s' % (name, i, food)
        print(f)
        pro.send(f) # 生产者操作生产管道端点向管道中传输数据
    pro.close() # 生产者生产完了后,将生产者管道端点关闭掉

if __name__ == '__main__':
    con, pro = Pipe()   # 创建一个管道,一端给消费者用,一端给生产者用
    lock = Lock()
    p = Process(target=producer, args=(con, pro, 'why', '泔水'))
    p.start()

    p2 = Process(target=consumer, args=(con, pro, 'fqq', lock))
    p2.start()

    p3 = Process(target=consumer, args=(con, pro, 'fqq2', lock))
    p3.start()

    con.close()
    pro.close()