# 管道实现生产者消费者模型# # 应该特别注意管道端点的正确管理问题,如果是生产者或消费者中都没有使用管道的端点就应该将它关闭# 这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这个步骤# 程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道# 后才能生成EoFEroor异常,因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。# Pipe 存在数据不安全性# 存在管道的一端被多个消费者进程取同一个数据的问题,出现时会报错。队列是同一时间只能有一个进程取数据。# 解决方案,加上锁,,管道的操作加上锁后,其实就是队列了,管道比队列底层,管道走的是socket。队列是数据安全性的# 队列相当于带锁的管道import time import random from multiprocessing import Process, Pipe, Lockdef consumer(con, pro, name, lock):'''# 消费者一方:param con::param pro::param name::return:'''pro.close() # 消费者不需要生产者一方的管道端点,关闭掉while True:try:lock.acquire() # 管道操作前锁,防止多个进程争抢一个数据food = con.recv() # 消费者用消费者的管道端点,从管道中获取数据,当管道的所有端点被关闭掉时(我认为是管道的一端被全部关闭时),会抛出异常错误EoFError lock.release()print('%s 吃了 %s ' % (name, food))time.sleep(random.randint(1, 3))except EOFError:print('管道抛出了异常EOFError,管道所有端点被关闭掉,说明生产者生产完了')con.close()lock.release()breakdef producer(con, pro, name, food):'''生产者一方:param con: # 消费者一方的管道端点:param pro: # 生产者一方的管道端点:param name: # 谁生产:param food: # 生产的东西:return:'''con.close() # 生产者不需要消费者一方的管道端点,关闭掉for i in range(4):time.sleep(random.randint(1, 3))f = '%s 生产了 第%s个%s' % (name, i, food)print(f)pro.send(f) # 生产者操作生产管道端点向管道中传输数据pro.close() # 生产者生产完了后,将生产者管道端点关闭掉if __name__ == '__main__':con, pro = Pipe() # 创建一个管道,一端给消费者用,一端给生产者用lock = Lock()p = Process(target=producer, args=(con, pro, 'why', '泔水'))p.start()p2 = Process(target=consumer, args=(con, pro, 'fqq', lock))p2.start()p3 = Process(target=consumer, args=(con, pro, 'fqq2', lock))p3.start()con.close()pro.close()