并发编程之多进程
-
获取进程以及父进程的pid
进程在内存中开启多个,操作系统如何区分这些进程?每个进程都有一个唯一标识,
-
在终端查看进程的pid.
命令行输入:
tasklist
-
在终端查看执行的进程pid
命令行输入:
tasklist| findstr 进程名
-
通过代码查看pid
import os import time print(f'子进程:{os.getpid()}') print(f'父进程:{os.getppid()}') time.sleep(5000)
-
-
验证进程之间的数据隔离
# 主进程一定要在子进程运行之后才执行. from multiprocessing import Process import time x = 1000 def task(name): print(name) global x x = 2 if __name__ == '__main__': p1 = Process(target=task,args=('太白',)) p1.start() time.sleep(3) print(f'主进程:{x}') # 数字-5~256 主进程子进程是沿用同一个.
-
join方法
# 单个进程 from multiprocessing import Process import time def task(name): time.sleep(1) print(f'{name} is running') if __name__ == '__main__': p = Process(target=task,args=('李业',)) p.start() p.join() # 告知主进程,p进程结束之后,主进程再运行. # join也是一种阻塞 print('===主进程') # 多个进程 from multiprocessing import Process import time def task(sec): time.sleep(sec) print(f'{sec}: is running') if __name__ == '__main__': start_time = time.time() p_l = [] for i in range(1,4): p = Process(target=task, args=(i,)) p.start() p_l.append(p) for i in p_l: i.join() print(f'===主进程:{time.time() - start_time}之后,执行')
-
进程对象的其他属性
from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(3) print(f'{name} is done') if __name__ == '__main__': p = Process(target=task,args=('怼哥',) ,name='任务1') # name给进程对象设置name属性 p.start() print(p.pid) # 获取进程pid号 print(p.name) # 查看name属性 p.terminate() # 终止(结束)子进程 # terminate 与 start一样的工作原理: 都是通知操作系统终止或者开启一个子进程,内存中终止或者开启(耗费时间) print(p.is_alive()) # 判断子进程是否存活 # 只是查看内存中p子进程是否运行. print('===主进程')
-
僵尸进程与孤儿进程
# linux(mac)环境下才强调的两个概念,windows下没有. # 面试官会问到这两个概念. from multiprocessing import Process import time import os def task(name): print(f'{name} is running') print(f'子进程开始了:{os.getpid()}') time.sleep(50) if __name__ == '__main__': for i in range(100000): p = Process(target=task,args=('怼哥',)) p.start() print(f'主进程开始了:{os.getpid()}') # 主进程是子进程的发起者. 主进程要等到所有的子进程运行结束之后,主进程再结束. # 而在这个过程中,有些子进程已经运行完毕,但是父进程还在运行,并没有对这些子进程进行回收 # 这些子进程依然占据着pid,消耗着内存,这样的子进程就被称为:僵尸进程. # 内存中会保留这些子进程的信息,只有当主进程调用wait 或waitpid 获取进程的状态信息,这些信息才会消失 # 孤儿进程: 此时如果主进程由于各种原因,提前消失了,它下面的所有的子进程都成为孤儿进程了. # 谁给孤儿进程收尸? 一段时间之后, init就会对孤儿进程进行回收. # 孤儿进程无害:如果僵尸进程挂了,init会对孤儿进程进行回收. # 僵尸进程有害:父进程进行一个死循环,无限的开启子进程,递归的开启,子进程越来越多,僵尸进程越来越多,占用大量的pid, # 如果父进程不调用wait/waitpid的话,那么保留的信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵尸进程,将因为没有可用的进程号而导致系统不能产生新的进程,这就是僵尸进程的危害,应当避免
-
守护进程
# 守护进程: 子进程对父进程可以进行守护. from multiprocessing import Process import time import os def task(name): print(f'{name} is running') print(f'子进程开始了:{os.getpid()}') time.sleep(50) if __name__ == '__main__': p = Process(target=task,args=('怼哥',)) p.daemon = True # 将p子进程设置成守护进程,守护主进程,只要主进程结束,子进程无论执行与否,都马上结束. p.start() time.sleep(2) print(f'主进程开始了:{os.getpid()}')
-
互斥锁
# 当3个进程,同一时刻共抢一个资源: 输出平台时. # 多个进程共抢一个资源,你要做到结果第一位,效率第二位. # 你应该牺牲效率,保求结果.利用串行. from multiprocessing import Process from multiprocessing import Lock # 导入互斥锁 import time import random def task1(lock): print('task1') # 验证cpu遇到io切换了 lock.acquire() # 当主程序运行到这里,发现存在互斥锁,会切换到另一个进程 print('task1: 开始打印') time.sleep(random.randint(1, 3)) # 模拟延迟 print('task1: 打印完成') lock.release() # 开锁 def task2(lock): print('task2') # 验证cpu遇到io切换了 lock.acquire() # 发现这里也有锁 print('task2: 开始打印') time.sleep(random.randint(1, 3)) print('task2: 打印完成') lock.release() # 开锁 def task3(lock): print('task3') # 验证cpu遇到io切换了 lock.acquire() # 这也有锁 print('task3: 开始打印') time.sleep(random.randint(1, 3)) print('task3: 打印完成') lock.release() # 开锁 if __name__ == '__main__': lock = Lock() p1 = Process(target=task1, args=(lock,)) p2 = Process(target=task2, args=(lock,)) p3 = Process(target=task3, args=(lock,)) p1.start() p2.start() p3.start() # 上锁: # 一定要是同一把锁: 只能按照这个规律:上锁一次,解锁一次. # 互斥锁与join区别共同点? (面试题) # 共同点: 都是完成了进程之间的串行. # 区别: join是人为控制的进程串行,互斥锁是随机的抢占资源.保证了公平性
-
进程之间的通信: 队列.
上一节: 多个进程之间的通信:基于文件以及加锁的方式.
- 操作文件效率低.
- 自己加锁很麻烦. 很容易出现死锁,递归锁.
进程之间的通信最好的方式是基于队列.
什么是队列?
队列就是存在于内存中的一个容器,最大的一个特点; 队列的特性就是FIFO.完全支持先进先出的原则.
from multiprocessing import Queue # 导入队列 q = Queue(3) # 括号里的参数是可以设置队列里的最大元素个数 def func(): print('in func') q.put('alex') q.put({'count': 1}) q.put(func) # q.put(666) # 当队列数据已经达到上限,再插入数据的时候,程序就会夯住. # print(111) print(q.get()) print(q.get()) ret = q.get() ret() print(q.get()) # 当你将数据取完后再继续取值时,默认也会阻塞
其他参数:
maxsize()
q = Queue(3) 队列中的数据量不易过大.存放精简的重要的数据.-
put block 默认为True 当你插入的数据超过最大限度,默认阻塞
q = Queue(3) # 可以设置元素个数 q.put('alex') q.put({'count': 1}) q.put(22) q.put(333,block=False) # 改成False 数据超过最大限度,不阻塞了直接报错.
-
put timeout() 参数 :延时报错,超过设定时间再put不进数据,就会报错.
get里面的参数:block,timeout一样.
q = Queue(3) # 可以设置元素个数 q.put('alex') q.put({'count': 1}) q.put(22) q.put(333,timeout=3) # 延时报错,超过三秒再put不进数据,就会报错. print(q.get()) print(q.get()) print(q.get()) print(q.get(block=False)) # get里的参数效果与put里的参数效果相同 timeout
-
进程之间的通信实例.
# 小米:抢手环4.预期发售10个. # 有100个人去抢. import os from multiprocessing import Queue from multiprocessing import Process def task(q): try: q.put(f'{os.getpid()}',block=False) except Exception: return if __name__ == '__main__': q = Queue(10) for i in range(100): p = Process(target=task,args=(q,)) p.start() for i in range(1,11): print(f'排名第{i}的用户: {q.get()}',)
利用队列进行进程之间通信: 简单,方便,不用自己手动加锁.队列自带阻塞,可持续化获取数据.
-
生产者消费者模型
模型, 设计模式,归一化设计, 理论等等,教给你一个编程思路.如果以后遇到类似的情况,直接套用即可.
生产者: 生产数据进程.
消费者: 对生产者生产出来的数据做进一步处理进程.
吃饭: 吃包子. 厨师生产出包子,不可能直接给到你嘴里. 放在一个盆中,消费者从盆中取出包子食用.三个主体 : (生产者)厨师, (容器队列)盆, (消费者)人.
为什么夹杂这个容器?
如果没有容器, 生产者与消费者强耦合性.不合理.所以我们要有一个容器,缓冲区.平衡了生产力与消费力.
生产者消费者多应用于并发.
from multiprocessing import Process from multiprocessing import Queue import time import random def producer(name,q): for i in range(1,6): time.sleep(random.randint(1,3)) res = f'{i}号包子' q.put(res) print(f'\033[0;32m 生产者{name}: 生产了{res}\033[0m') def consumer(name,q): while 1: try: time.sleep(random.randint(1,3)) ret = q.get(timeout=5) print(f'消费者{name}: 吃了{ret}') except Exception: return if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=('太白',q)) p2 = Process(target=consumer, args=('MC骚强',q)) p1.start() p2.start()
生产者消费者模型:
合理的去调控多个进程去生成数据以及提取数据,中间有个必不可少的环节容器队列.