day34 并发编程之生产者消费者模型 队列

时间:2020-12-15 16:41:44

1.守护进程(了解)

"""
守护进程
表示 一个进程b 守护另一个进程a 当被守护的进程a结束后 那么b也跟着结束了
就像 皇帝驾崩 妃子殉葬 应用场景
之所以开启子进程 是为了帮主进程完成某个任务 然而 如果主进程认为 自己的事情一旦做完就没有必要使用子进程了
就可以将子进程设置为守护进程 例如
在运行qq的过程 开启了一个进程 用于下载文件 然而文件还没有下完 qq就退出了 下载任务也应该跟随qq 的退出而结束 """
import time
from multiprocessing import Process def task():
print("妃子的一生")
time.sleep(5)
print("妃子凉了") if __name__ == '__main__':
fz = Process(target=task)
fz.daemon = True # 将子进程作为主进程的守护进程 要注意 必须在开启子进程之前 设置!
fz.start() print("皇帝登基了")
time.sleep(2)
print("当了十年皇帝..")
print("皇帝驾崩")

2.互斥锁(重要)

# """
# 当多个进程共享一个数据时,可能会造成数据错乱
# 1.使用join 来让这些进程 串行 但是这将造成 无法并发 并且 进程执行任务的顺序就固定了
# 2.使用锁 将需要共享的数据加锁 其他进程在访问数据时 就必须等待当前进程使用完毕
#
# 锁的本质 就是一个bool类型的数据 在执行代码前 会先判断 这个值
# 注意 在使用锁时 必须保证锁是同一个
#
# 互斥锁
# 互相排斥的锁
#
# """
#
# from multiprocessing import Process,Lock
#
# import random
# import time
#
# def task1(lock):
# lock.acquire() # 是一个阻塞的函数 会等到别的进程释放锁才能继续执行
# lock.acquire()
# print("1my name is:bgon")
# time.sleep(random.randint(1,2))
# print("1my age is:78")
# time.sleep(random.randint(1, 2))
# print("1my sex is:femal")
# lock.release()
#
# def task2(lock):
# lock.acquire()
# print("2my name is:blex")
# time.sleep(random.randint(1, 2))
# print("2my age is:68")
# time.sleep(random.randint(1, 2))
# print("2my sex is:femal")
# lock.release()
#
#
#
#
# def task3(lock):
# pass
# # 锁的实现原理 伪代码
# # l = False
# # def task3(lock):
# # global l
# # if l == False:
# # l = True
# # print("3my name is:常威")
# # time.sleep(random.randint(1, 2))
# # print("3my age is:68")
# # time.sleep(random.randint(1, 2))
# # print("3my sex is:femal")
# # l = False
#
# if __name__ == '__main__':
# lock = Lock()
#
# p1 = Process(target=task1,args=(lock,))
# p1.start()
# # p1.join()
#
# p2 = Process(target=task2,args=(lock,))
# p2.start()
# # p2.join()
#
# p3 = Process(target=task3,args=(lock,))
# p3.start()
# # p3.join()
#
# # 多个任务在共享一个数据时
# # 串行效率低 但是不会出问题
# # 并发效率高 但是数据可能错乱
#
#
# from multiprocessing import Lock,RLock,Process # lock = Lock()
#
# lock.acquire()
# lock.acquire()
# print("haha ")
# lock.release() # RLock 表示可重入锁 特点是 可以多次执行acquire
# Rlock 在执行多次acquire时 和普通Lock没有任何区别
# 如果在多进程中使用Rlock 并且一个进程a 执行了多次acquire
# 其他进程b要想获得这个锁 需要进程a 把锁解开 并且锁了几次就要解几次
# 普通锁如果多次执行acquire将会锁死 # lock = RLock()
# lock.acquire()
# lock.acquire()
#
# print("哈哈")
# lock.release() import time
def task(i,lock):
lock.acquire()
lock.acquire()
print(i)
time.sleep(3)
lock.release()
lock.release()
#第一个过来 睡一秒 第二个过来了 睡一秒 第一个打印1 第二个打印2 if __name__ == '__main__':
lock = RLock()
p1 = Process(target=task,args=(1,lock))
p1.start() p2 = Process(target=task, args=(2,lock))
p2.start()

正常开发时,一把锁足够使用,不要开多把锁

模拟抢票进程

import json
from multiprocessing import Process,Lock
import time
import random """
join和锁的区别
1.join中顺序是固定的 不公平
2.join是完全串行 而 锁可以使部分代码串行 其他代码还是并发 """ # 查看剩余票数
def check_ticket(usr):
time.sleep(random.randint(1,3))
with open("ticket.json","r",encoding="utf-8") as f:
dic = json.load(f)
print("%s查看 剩余票数:%s" % (usr,dic["count"])) def buy_ticket(usr):
with open("ticket.json","r",encoding="utf-8") as f:
dic = json.load(f)
if dic["count"] > 0:
time.sleep(random.randint(1,3))
dic["count"] -= 1
with open("ticket.json", "w", encoding="utf-8") as f2:
json.dump(dic,f2)
print("%s 购票成功!" % usr) def task(usr,lock): check_ticket(usr) lock.acquire()
buy_ticket(usr)
lock.release() if __name__ == '__main__':
lock = Lock() for i in range(10):
p = Process(target=task,args=("用户%s" % i,lock))
p.start()
#p.join() # 只有第一个整个必须完毕 别人才能买 这是不公平的

死锁

"""

    死锁 指的是 锁 无法打开了   导致程序死卡

    首先要明确  一把锁 时不会锁死的

    正常开发时 一把锁足够使用 不要开多把锁

"""

from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
l1.acquire()
print("盘子被%s抢走了" % i)
time.sleep(1) l2.acquire()
print("筷子被%s抢走了" % i)
print("吃饭..")
l1.release()
l2.release() pass def task2(l1,l2,i): l2.acquire()
print("筷子被%s抢走了" % i) l1.acquire()
print("盘子被%s抢走了" % i) print("吃饭..")
l1.release()
l2.release() if __name__ == '__main__':
l1 = Lock()
l2 = Lock()
Process(target=task1,args=(l1,l2,1)).start()
Process(target=task2,args=(l1,l2,2)).start()

3.IPC(进程间通信)

由于进程之间内存是相互独立的,所以需要对应解决方案,能够使得进城之间可以相互传递数据

1.使用文件,多个进程同时读写一个文件
IO速度慢,可传输数据大小不受限制 2.管道 是基于内存的,速度快,但是单向通讯,用起来麻烦(了解) 3.队列 申请共享内存空间,多个进程可以共享这个内存空间(重点)
速度快,但是数据量不能太大 from multiprocessing import Manager,Process,Lock
def work(d):
# with lock: # 加上锁,每个进程操作时候就保证其他进程不会来操作
d['count']-=1 if __name__ == '__main__': with Manager() as m:
dic=m.dict({'count':100}) #创建一个共享的字典
p_l=[]
for i in range(100):
p=Process(target=work,args=(dic,))
p_l.append(p)
p.start() for p in p_l:
p.join()
print(dic)
以上这样写会有问题,可能多个进程同时操作内存空间,会报错,要加锁

使用队列完成进程间通信

"""
队列 不只用于进程间通讯
也是一种常见的数据容器 其特点是:先进先出
其优点是:可以保证数据不会错乱 即使在多进程下 因为其put和get默认都是阻塞的 对比堆栈刚好相反 :后进先出
# """ from multiprocessing import Queue # q = Queue(1) # 创建一个队列 最多可以存一个数据
#
# q.put("张三")
# print(q.get())
#
# q.put("李四") # put默认会阻塞 当容器中已经装满了
#
# print(q.get())
# print(q.get()) # get默认会阻塞 当容器中已经没有数据了
#
# print("over") q = Queue(1) # 创建一个队列 最多可以存一个数据
#
q.put("张三")
# q.put("李四",False) # 第二个参数 设置为False表示不会阻塞 无论容器是满了 都会强行塞 如果满了就抛异常
#
print(q.get())
# print(q.get(timeout=3)) # timeout 仅用于阻塞时 q.put("李四") # put默认会阻塞 当容器中已经装满了 print(q.get())
print(q.get()) # get默认会阻塞 当容器中已经没有数据了 print("over")

4.生产者消费者模型

"""
什么是生产者 消费者 模型
生产者 产生数据的一方
消费者 处理数据的一方 例如需要做一个爬虫
1.爬取数据
2.解析数据 爬去和解析都是耗时操作,如果正常按照顺序来编写代码,将造成解析需要等待爬去 爬去取也需要等待解析
这样效率是很低的
要提高效率 就是一个原则 让生产者和消费解开耦合 自己干自己的
如何实现:
1.将两个任务分别分配给不同进程
2.提供一个进程共享的数据容器 """
import random
from multiprocessing import Process,Queue
import time
# 爬数据
def get_data(q): for num in range(5):
print("正在爬取第%s个数据" % num)
time.sleep(random.randint(1,2))
print("第%s个数据 爬取完成" % num)
# 把数据装到队列中
q.put("第%s个数据" % num) def parse_data(q):
for num in range(5):
# 取出数据
data = q.get()
print("正在解析%s" % data)
time.sleep(random.randint(1, 2))
print("%s 解析完成" % data) if __name__ == '__main__':
# 共享数据容器
q = Queue(5)
#生产者进程
produce = Process(target=get_data,args=(q,))
produce.start()
#消费者进程
customer = Process(target=parse_data,args=(q,))
customer.start()