一、开启进程的两种方式
方式一:
# 方式一:使用函数开启进程 from multiprocessing import Process import time def task(x): print('%s is running' % x) time.sleep(1) print('%s is done' % x) if __name__ == '__main__': # p1 = Process(target=task, args=('子进程',)) #实例化子进程 p1 = Process(target=task, kwargs={'x': '子进程'}) p1.start() # 向操作系统申请资源(内存空间,子进程pid号),然后开始执行task任务,本动作不影响主进程,主进程则会继续执行。 print('这是主进程...')
方式二:
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,x): super(MyProcess,self).__init__() self.x = x def run(self): print('%s is running'%self.x) time.sleep(1) print('%s is done'%self.x) if __name__ == '__main__': p1 = MyProcess('子进程') p1.start() print('这是主进程....')
二、进程间数据是物理隔离的
# from multiprocessing import Process # import time # # x = 100 # def task(): # global x # x = 0 # print('子进程中执行x所得的值为%s'% x) # time.sleep(1) # # if __name__ == '__main__': # p1 = Process(target=task) # p1.start() # p1.join() # print('父进程中执行x所得的值为%s'% x)
from multiprocessing import Process import time # def task(n,t): # print('%s is running'% n) # time.sleep(t) # print('%s is done!'% n) # # # if __name__ == '__main__': # p1 = Process(target=task,args=('子进程%s'% 1,1)) # p2 = Process(target=task,args=('子进程%s'% 2,2)) # p3 = Process(target=task,args=('子进程%s'% 3,3)) # # start = time.time() # p1.start() # p1.join() # # p2.start() # p2.join() # # p3.start() # p3.join() # # # p1.join() # # p2.join() # # p3.join() # # stop = time.time() # print('子进程执行时间为%s'%(stop-start)) # # print('主进程....') # ''' # a 没有join的情况下,执行时间为0.008279085159301758 # b 有join的情况下,执行时间为3.011568069458008 # c 程序执行逻辑为串行时,执行时间为6.037945747375488 # '''
from multiprocessing import Process import time def task(n,t): print('%s is running'% n) time.sleep(t) print('%s is done!'% n) if __name__ == '__main__': start = time.time() p_list = [] for i in range(1,4): p = Process(target=task, args=('子进程%s'%i,i)) p_list.append(p) p.start() for j in p_list: j.join() stop = time.time() print('子程序消耗的时间合计%s'%(stop-start)) print(p_list) print('主程序...')
三、孤儿进程和僵尸进程
''' 子进程的开启所消耗的资源和时间比较长 所有的子进程在执行完毕之后,并不会立即消失,会保留进程号,进程的执行时间等,这种状态称为僵尸进程 异步:父进程没死,子进程还在运行,但是父进程不发送wait()/waitpid()给子进程---僵尸进程 紧接着,父进程死了,子进程还在运行,此时子进程就是孤儿进程,会被init进程(0)接管。(此时init会发送wait给子进程) '''
四、守护进程及一些进程的属性
from multiprocessing import Process import time import os def task(x): print('%s is running %s'%(x,os.getpid())) # time.sleep(2) print('%s is done'%x) if __name__ == '__main__': p = Process(target=task, args=('子进程',),name='自定义进程名称') p.daemon = True # 代表子进程是一个守护进程,守护的是父进程 # 释义:当父进程执行完毕之后,子进程也随之死掉(有点像皇帝死了,活着的妃子们也跟着陪葬。。。) # 所以,子进程task()就直接死了,不会打印任何信息,因为子进程开启时间较长 p.start() print('主') # print('子进程为%s'%p.pid) #获取当前进程ID # print('进程为%s'%os.getppid()) #获取父进程ID # print(p.name) #打印进程名称,默认现实process-1,若需要自定义进程名字的话,则需要在Process()中定义name='自定义' # print(p.is_alive()) #判断子进程是否存活,True/False
五、互斥锁(实际工作中,可能不是经常会使用到互斥锁,推荐使用队列)
import random from multiprocessing import Process from multiprocessing import Lock import time import json import os def check(): time.sleep(1) with open('db.txt','rt',encoding='utf-8')as f1: db_dic = json.load(f1) print('%s 查看了剩余的票数为%s'%(os.getpid(),db_dic['count'])) def get_ticket(): time.sleep(2) with open('db.txt','rt',encoding='utf-8')as f1: db_dic = json.load(f1) if db_dic['count'] > 0: # print('可以买') db_dic['count'] -= 1 time.sleep(random.randint(1,3)) with open('db.txt','wt',encoding='utf-8') as f1: json.dump(db_dic,f1) print('%s 购买成功'% os.getpid()) else: print('%s 购买失败'% os.getpid()) def task(mutex): check() #查看动作是并发的 mutex.acquire() #加锁 get_ticket() #需要串行来进行买票 mutex.release() #释放锁 if __name__ == '__main__': mutex = Lock() # 启用互斥锁 for i in range(10): p = Process(target=task,args=(mutex,)) p.start() # join: 会等待所有的子进程执行完毕, 程序变成串行的 # mutex: 互斥锁会将共享的数据操作,变成串行 虽然效率降低了,但是安全性和数据完整性提升
六、队列(推荐使用队列)
# #例1: # import time # import random # from multiprocessing import Queue # from multiprocessing import Process # # # # def producer(food, q, name): # for i in range(3): # res = '%s%s'%(food,i) # time.sleep(random.randint(1,3)) # q.put(res) # print('\033[32;1m %s 生产了%s \033[0m'%(name,res)) # # q.put(None) # # # # # def consumer(q, name): # # while True: # res = q.get() # if res == None:break # time.sleep(random.randint(1,3)) # print('%s消费了%s'%(name,res)) # # # if __name__ == '__main__': # q = Queue() # p1 = Process(target=producer, args=('包子',q,'egon')) # p2 = Process(target=producer, args=('泔水',q,'stephen')) # p3 = Process(target=producer, args=('翔',q,'小猴')) # # # # c1 = Process(target=consumer, args=(q,'WuPeiQisb')) # c2 = Process(target=consumer, args=(q,'alex')) # # p1.start() # p2.start() # p3.start() # # c1.start() # c2.start() # # p1.join() # p2.join() # p3.join() # # q.put(None) # q.put(None) # # print('主...')
#例2: import time import random from multiprocessing import Process from multiprocessing import JoinableQueue def producer(q, name, food): for i in range(3): res = '%s%s'%(food,i) time.sleep(random.randint(1,3)) q.put(res) print('\033[32;1m %s 生产了%s \033[0m'%(name,res)) def consumer(q, name): while True: res = q.get() time.sleep(random.randint(1,3)) print('%s消费了%s'%(name,res)) q.task_done() #告诉队列,我已经消费完了 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=(q,'egon','包子')) p2 = Process(target=producer, args=(q,'stephen','泔水')) p3 = Process(target=producer, args=(q,'小猴','泔水')) c1 = Process(target=consumer, args=(q,'WuPeiQisb')) c2 = Process(target=consumer, args=(q,'alex')) p1.start() p2.start() p3.start() c1.daemon = True c2.daemon = True c1.start() c2.start() p1.join() p2.join() p3.join() q.join() # 意味着整个进程执行完毕,消费者卡着,消费者应该跟着消亡 # print('主...')
七、概念
时间上的复用: cpu的切换速度非常快,感觉像是并发的一个状态 空间上的复用: 物理内存空间是物理隔离的 进程: 一个正在运行的程序 程序: 纯代码的东西 并行:多核同时执行多个任务 串行:一个任务完完整整的执行完毕了,再执行下一个任务 并发:单个任务执行一段时间,又cpu立马切换到另一个任务,在多个任务当中来回切换,感觉上像是并发的。 ps:cpu切换的条件是:1遇到I/O(网络延时/对文件的读写) 2优先级更高的任务也会迅速的切换 3如果一个任务长时间占用cpu,也会进行切换
注意:
进程的缺点:
1 开启子进程是非常消耗资源的,所以我们主进程会先于子进程运行,因此我们的子进程不能无限制的开启
2 如果开了过多的子进程的话,cpu的切换在进程的模式下是非常消耗时间的
子进程的正常执行流程:
由于父进程和子进程的执行过程是异步的,因此子进程什么时候执行完毕,父进程压根就不知道,
因此,父进程此时虽然执行完毕了,但是并没有完全退出程序,而是会不断的调用wait()/waitpid()
函数去询问子进程是否执行完毕,若子进程执行完毕,父进程就会去收尸子进程
僵尸进程:
父进程没有退出,但是父进程并不会调用wait()/waitpid()来去询问子进程,子进程执行完了以后,就会过多的占用操作系统的资源(因为init判断该子进程的父进程并没有结束,所以不会接管该子进程)
孤儿进程:
父进程完全退出了,但是子进程并没有结束,此时这个子进程没有人去管了,就是一个孤儿进程,该进程就会由init(0)来进行接管
过多的孤儿进程,是没有害的,因为由init进程来接管
八、进程池
注意:进程池,一般公司是不让使用的。因为会验证消耗服务器资源,如果整个程序中不存在任何IO,纯计算的话,方可给予权限进行使用
**** 建议使用线程池 ****
# 例1:使用pool来开启子进程执行任务 from multiprocessing import Pool import time import os def task(i): i += 1 if __name__ == '__main__': ''' 分配进程个数时,推荐建议使用cpu核数+1 ''' # print(os.cpu_count()) p = Pool(5) #相当于实例化了 5 个进程 p.map(task, range(200)) #相当于 p.start() ,天生异步,每次执行5个进程去接收range中的20个任务
# 优化例1 # 计算使用进程池来执行任务 所需要话费的时间 from multiprocessing import Pool import time import os def task(i): i += 1 if __name__ == '__main__': ''' 分配进程个数时,推荐建议使用cpu核数+1 ''' start = time.time() p = Pool(5) #相当于实例化了 5 个进程 p.map(task, range(200)) #相当于 p.start() ,天生异步,每次执行5个进程 p.close() #为了防止继续往里面提交任务,保护进程池,关闭掉进程池所接收的任务通道 p.join() #等待子进程执行完毕 print(time.time() - start) #计算开启进程所消耗的总时间
#例3:使用之前的老方法逐个开启任务,并形成执行时间的计算 # 对标 例2 from multiprocessing import Pool from multiprocessing import Process import time import os def task(i): i += 1 if __name__ == '__main__': ''' 分配进程个数时,推荐建议使用cpu核数+1 ''' start = time.time() p_l = [] for i in range(200): p = Process(target=task, args=(i,)) p_l.append(p) p.start() for j in p_l: j.join() print(time.time() - start)
#例4:进程池之 apply的应用 from multiprocessing import Pool from multiprocessing import Process import time import random import os def task(i): i += 1 print(i) time.sleep(random.randint(1,2)) if __name__ == '__main__': ''' 分配进程个数时,推荐建议使用cpu核数+1 ''' p = Pool(5) for i in range(20): p.apply(task, args=(i,)) #apply天生同步
#例5:进程池之 apply_async 的应用(low版本) from multiprocessing import Pool from multiprocessing import Process import time import random import os def task(i): i += 1 print(i) time.sleep(random.randint(1,2)) if __name__ == '__main__': ''' 分配进程个数时,推荐建议使用cpu核数+1 ''' p = Pool(5) for i in range(20): res = p.apply_async(task, args=(i,)) #apply_async天生异步 res.get() #如果在for循环中,使用res.get(),则整个程序又变成了同步的状态
#例6:进程池之 apply_async 的应用(优化版本) from multiprocessing import Pool from multiprocessing import Process import time import random import os def task(i): i += 1 print(i) time.sleep(random.randint(1,2)) if __name__ == '__main__': ''' 分配进程个数时,推荐建议使用cpu核数+1 ''' p = Pool(5) res_li = [] for i in range(20): res = p.apply_async(task, args=(i,)) #apply_async天生异步 res_li.append(res) #将所有 apply_async返回的结果 都添加到列表中再get(),则程序又恢复异步 for res in res_li: res.get()