什么是进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
进程的特征:
动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生,动态消亡的。
并发性:任何进程都可以同其他进程一起并发执行
独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;
异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进
结构特征:进程由程序、数据和进程控制块三部分组成。
多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程,能得到不同的结果;但是执行过程中,程序不能发生改变。
进程与程序的区别:
程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。
而进程是程序在处理机上的一次执行过程,它是一个动态的概念。
并发与并行
并发:伪并行,看着像同时运行,其实是任务之间的切换(遇到IO切换的会提高代码效率),任务切换+报错状态(保存现场)
并行:真正同时运行,应用的是多核技术(多个cpu)
同步\异步\阻塞\非阻塞
进程的三状态:就绪(等待操作系统调度去cpu里面执行) 执行 阻塞
同步:多个任务提交出去要排队一个任务执行完后才执行下一个是串行的
异步 :任务的提交方式,多个任务提交出去,同时执行
阻塞与非阻塞:
阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的
同步阻塞
效率最低
异步阻塞
异步操作是可以被阻塞住的,只不过它不是处理消息时阻塞,而是等待消息通知时阻塞
同步非阻塞形式
实际上效率低下
程序要在两种不同的行为之间来回切换
异步非阻塞形式
效率高
程序不需要在两种不同的操作之间来回切换
multiprocess模块
Process类中参数的介绍
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'egon',)
kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
name为子进程的名称
from multiprocessing import Process
import time
def f1():
(2)
print("xxx")
def f2():
(2)
print("kkk")
#windows系统下必须写main,因为windows系统创建子进程的方式决定的,开启一个子进程,
#这个子进程 会copy一份主进程的所有代码,并且机制类似于import引入,这样就容易导致引入代码的时候,
#被引入的代码中的可执行程序被执行,导致递归开始进程,会报错
if __name__ == '__main__':
p1 = Process(target=f1,)
p2 = Process(target=f2,)
()
()
Process类中方法介绍:
():启动进程,并调用该进程中的()
():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类中一定要实现这方法
(): 强制终止进程p,不会进行任何清理操作
p.is_alive(): 如果p仍然运行,返回True
(): 主进程等待子进程运行完才继续执行,只能join住start开启的进程,而不能join住run开启的进程
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcessdeff1():
(2)print("xxx")deff2():
(2)print("sss")if __name__ == '__main__':
p1= Process(target=f1,)
()
()#主进程等待子进程运行完才继续执行
print("开始")
p2= Process(target=f2,)
()
()print("我是主进程!!!")
join的用法
from multiprocessing importProcessdeff1(n):print(n)if __name__ == '__main__':#p1 = Process(target=f1,args=("alex",))
p1 = Process(target=f1,kwargs={'n':'alex'})
()
进程传参
from multiprocessing importProcessclassMyProcess(Process):def __init__(self,n):
super().__init__() #执行父类的init
=ndefrun(self):print("wusir和%s在一起"%)if __name__ == '__main__':
p1= MyProcess("alex")
()
进程传参2
Process类中自带封装的各属性
:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在()之前设置
:进程的名称
:进程的pid
windows中的Process()必须放到if __name__ == '__main__':下
没有写就会不断的递归调用模块,导致报错
创建进程的第二种方法(继承)
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,n):
super().__init__()
= n
def run(self):
print("%s和alex不可告人的密码"%)
if __name__ == '__main__':
p1= MyProcess("wusir")
()
from multiprocessing importProcess
n= 100
defwork():globaln
n=0print('子进程:',n)#0
if __name__ == '__main__':
p= Process(target=work,)
()
()print("主进程内",n)#100
进程间数据是隔离的
Process对象其他方法或属性
from multiprocessing importProcessimporttimeimportrandomclassPiao(Process):def __init__(self,name):
=name
super().__init__()defrun(self):print("%s is piaoing" %)
((1,5))print("%s is piaoing" %)
p1= Piao("alex")
()print(p1.is_alive()) #True
() #关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
(0.2)print("开始")print(p1.is_alive()) #False
terminate和is_live
from multiprocessing importProcessimporttimeimportrandomclassPiao(Process):def __init__(self,name):
super().__init__()
=namedefrun(self):print("%s is piaoing"%)
((1,3))print("%s is paioing" %)if __name__ == '__main__':
p= Piao('alex')
()print("开始")print()
pid用法
僵尸进程:当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程
孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
守护进程
一.守护进程会在主进程代码执行结束后终止
二.守护进程无法再开启子进程
= True #一定要在()前添加
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcessdeff1():
(3)print('守护进程的代码')deff2():
(5)print("普通进程的代码")if __name__ == '__main__':
p= Process(target=f1,)
= True #设置为守护进程
()
p2= Process(target=f2,)
()#等待p2进程结束,才进行执行下面的代码
#()
#守护进程会跟着父进程代码的结束而结束
print("主程序结束")
守护进程
进程同步锁
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,串行的修改,牺牲了速度确保证了数据安全
用文件共享数据实现进程间通信存在的问题:
1.效率低(数据在硬盘上)
2.需要自己加锁处理
from multiprocessing importProcess,Lockimporttimedeff1(i,lic):
()
(1)print(i)
()if __name__ == '__main__':
lic=Lock()for i in range(20):
p= Process(target=f1,args=(i,lic))
()
互斥锁
加锁模拟抢票
#-*- coding:utf-8 -*-#先创建一个文件,写入{'count':1}
importtimefrom multiprocessing importProcess,Lockdefshow_t(i):
with open('ticket','r',encoding='utf-8') as f:
ticket_data=()
t_data=eval(ticket_data)print("%s查询剩余票数为%s"%(i,t_data['count']))defget_t(i,l1):
()
with open('ticket','r',encoding='utf-8') as f:
ticket_data=()
t_data=eval(ticket_data)if t_data['count'] >0:
t_data['count'] -= 1
print("%s抢票成功"%i)
(0.2)
with open('ticket','w') as f:
(str(t_data))else:print('没票了!!!')
()if __name__ == '__main__':
l1=Lock()for i in range(10):
p1= Process(target=show_t,args=(i,))
()for i in range(10):
p2= Process(target=get_t,args=(i,l1))
()
抢票程序
队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。
1 队列和管道都是将数据存放于内存中
2 队列基于(管道+锁)实现,
from multiprocessing importProcess ,Queue
q= Queue(3)
(1)print(">>>>>:",())#返回当前队列的内容长度
print(())
(2)print(">>>>>:",())
(3)print(())try:
q.put_nowait(4) #
except:print("满了")print(())print(())print(())print("是不是不空",())try:
q.get_nowait()#
except:print("队列空了")print("那多了")
其他
q =Queue([maxsize]) 创建共享的进程队列。maxsize是队列中允许的最大项数。
() 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止
q.get_nowait( ) 不等待,会报错
q.put_nowait( ) 不等待,会报错
() 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。
() 返回队列中目前项目的正确数量。
() 如果调用此方法时 q为空,返回True。
() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的。
from multiprocessing importProcess,Queuedeff1(q):
("约吗")if __name__ == '__main__':
q=Queue(3)
p= Process(target=f1,args=(q,))
()
son_p_msg=()print("来自进程的消息:",son_p_msg)
基于队列的通信
Manager模块
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Lock,Managerdeff1(m_d,l1):
with l1:
tmp= m_d['num']
tmp-= (0.1)
m_d['num'] =tmpif __name__ == '__main__':
m=Manager()
l1=Lock()
m_d= ({'num':100})
p_list=[]for i in range(10):
p= Process(target=f1,args=(m_d,l1))
()
p_list.append(p)
[()for pp inp_list]print(m_d['num'])
修改进程共享数据
生产者消费者模型
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
#-*- coding:utf-8 -*-
from multiprocessing importQueue,Processimporttimedefproducer(q):for i in range(10):
(0.7)
s= '大包子%s号'%iprint(s+'新鲜出炉')
(s)defconsumer(q):while 1:
(1)
baozi=()print(baozi+"被吃了")if __name__ == '__main__':
q= Queue(10)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
low版,不能推迟
#-*- coding:utf-8 -*-
from multiprocessing importQueue,Processimporttimedefproducer(q):for i in range(10):
(0.7)
s= '大包子%s号'%iprint(s+'新鲜出炉')
(s)
(None)defconsumer(q):while 1:
(1)
baozi=()if baozi ==None:print("都吃完了")break
print(baozi+"被吃了")if __name__ == '__main__':
q= Queue(10)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
None判断版
#-*- coding:utf-8 -*-
from multiprocessing importQueue,Processimporttimedefproducer(q):for i in range(10):
(0.7)
s= '大包子%s号'%iprint(s+'新鲜出炉')
(s)defconsumer(q):while 1:
(1)try:
baozi= q.get_nowait()#不合适,因为无法确定做的块,还是吃的块,如果按照这样的写法,你吃的快的话,那么这个消费者的程序就直接结束了,不能满足需求
exceptException:break
print(baozi+"被吃了")if __name__ == '__main__':
q= Queue(10)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer,args=(q,))
pro_p.start()
con_p.start()
升级版
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Queue,JoinableQueuedefproducer(q):for i in range(10):
(0.2)
s= "大包子%s号"%iprint(s+"新鲜出炉")
(s)
()#就等着task_done()信号的数量,和我put进去的数量相同时,才继续执行
print("所有数据处理完毕")defconsumer(q):while 1:
(0.3)
baozi=()print(baozi+"被吃了")
q.task_done()#给队列发送一个取出的这个任务已经处理完毕的信号
if __name__ == '__main__':
q= JoinableQueue(30)
pro_p= Process(target=producer,args=(q,))
con_p= Process(target=consumer, args=(q,))
pro_p.start()
con_p.daemon=True
con_p.start()
pro_p.join()print("主进程结束")
终极版
JoinableQueue([maxsize])
maxsize是队列中允许最大项数,缩略无大小限制
q.task_done() 消费者使用此方法发出信号,表示()的返回项目已经被处理了,如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
() 生产者调用此方法进行阻塞,直到队列中所有项目被处理.阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
from multiprocessing importProcess,JoinableQueueimporttime,random,osdefconsumer(q):whileTrue:
res=()
((1,3))print('\033[45m%s 吃 %s\033[0m' %((),res))
q.task_done()defproducer(name,q):for i in range(10):
((1,3))
res='%s%s' %(name,i)
(res)print('\033[44m%s 生成了 %s\033[0m' %((),res))
()if __name__ == '__main__':
q=JoinableQueue()
p1= Process(target=producer, args=('包子',q))
p2= Process(target=producer, args=('馒头', q))
p3= Process(target=producer, args=('alex', q))
c1= Process(target=consumer,args=(q,))
c2= Process(target=consumer,args=(q,))
=True
=True
p_1=[p1,p2,p3,c1,c2]for p inp_1:
()
()
()
()print("主程序")
View Code
管道
进程间的通信
Pipe([duplex]) 在进程之间创建一条通道,管道默认全双工的
() 接收(obj)发送的对象.
(obj) 通过连接发送对象
() 关闭连接
from multiprocessing importProcess,Pipedeff1(conn):
from_qian=()print("我是子进程")print("来自主进程的消息:",from_qian)if __name__ == '__main__':
conn1,conn2= Pipe()#创建一个管道对象,全双工,返回管道的两端,但是一端发送的消息,只能另外一端接收,自己这一端是不能接收的
#可以将一端或者两端发送给其他的进程,那么多个进程之间就可以通过这一个管道进行通信了
p1 = Process(target=f1,args=(conn2,))
()
('小宝贝')print("主进程")
管道通信
信号量
importtimeimportrandomfrom multiprocessing importProcess,Semaphoredeff1(i,s):
()print("%s男嘉宾到了"%i)
((1,3))
()if __name__ == '__main__':
s= Semaphore(4) #计数器4,acquire一次减一,为0,其他人等待,release加一
for i in range(10):
p= Process(target=f1,args=(i,s))
()
Semaphore
事件
#-*- coding:utf-8 -*-
from multiprocessing importProcess,Event
e=Event()print("e的状态",e.is_set())
()#将e的状态改为True
print("e的状态",e.is_set())
()#将e的转态改为False
() #e这个事件对象如果为False,就在我加wait的地方等待
print("进程过了wait")
Event
进程池
() #同步方法
p.apply_async() #异步方法
() 关闭进程池,防止进一步操作
() 等待所有工作进程退出
importtimefrom multiprocessing importProcess,Pool#def f1(n):#(1)#print(n)#对比多进程和进程池的效率
deff1(n):for i in range(5):
n= n +iif __name__ == '__main__':#统计进程池进行100个任务的时间
s_time =()
pool= Pool(4) #里面这个参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数
#(f1,[1,2]) #参数必须是可迭代的
(f1,range(100)) #参数数据必须是可迭代的,异步提交任务时自带join功能
e_time =()
dif_time= e_time -s_time#统计100个进程,来执行100个任务的时间
p_s_t = ()#多进程起始时间
p_list =[]for i in range(100):
p= Process(target=f1,args=(i,))
()
p_list.append(p)
[()for pp inp_list]
p_e_t=()
p_dif_t= p_e_t -p_s_tprint("进程池的时间:",dif_time)print('多进程的执行时间:',p_dif_t)
进程池的map方法
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Pooldeff1(n):
(1)return n*nif __name__ == '__main__':
pool= Pool(4)for i in range(10):print("xxxx")
res= (f1,args=(i,))print(res)
进程池的同步方法
#-*- coding:utf-8 -*-
importtimefrom multiprocessing importProcess,Pooldeff1(n):
(0.5)return n*nif __name__ == '__main__':
pool= Pool(4)
res_list=[]for i in range(10):print("xxxx")#异步给进程池提交任务
res = pool.apply_async(f1,args=(i,))
res_list.append(res)#print("等待所有任务执行完")
#() #锁住进程池,意思就是不让其他的程序再往这个进程池里面提交任务了
#()
#打印结果,如果异步提交之后的结果对象
for i inres_list:print(())
进程池的异步方法
回调函数
#-*- coding:utf-8 -*-
importosfrom multiprocessing importPool,Processdeff1(n):print('进程池里面的进程id',())print('>>>>',n)return n*ndefcall_back_func(f):print(">>>>>>>>>>>",())print("回调函数的结果",f)if __name__ == '__main__':
pool= Pool(4)
res= pool.apply_async(f1,args=(5,),callback=call_back_func)
()
()print('子进程的id',())
callback