三. 并发编程 (程序中的进程操作)

时间:2021-06-02 18:00:13

一 .程序中的进程操作

1. multiprocess模块

仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关
的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

process模块介绍

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

 2. 进程(参数 方法 属性 注意事项)

参数说明
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'egon',)
kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
name为子进程的名称


方法介绍
p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,
进而导致死锁 p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,
而不能join住run开启的进程 属性介绍 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 p.name:进程的名称 p.pid:进程的pid p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时
才能成功(了解即可)
在windows中使用process模块的注意事项 在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动
import 启动它的这个文件,而在 import
的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__
判断保护起来,import 的时候 ,
就不会递归运行了。

 3. 进程创建使用

from  multiprocessing import Process

def run():
    print("22222")
if __name__=="__main__":
    p=Process(target=run)      #  实例化进程
    p.start()


print("*******************************************")

# 进程传参数
def run(*args):
    print(args)
if __name__=="__main__":
    p=Process(target=run,args=(10,20,30))      #  实例化进程
    p.start()

print("*******************************************")

# 来查看进程直间的关系   os.getpid() 查看当前进程号    os.getppid() 查看当前父进程号  
def run(*args):
    print(args)
    time.sleep(1)
    print(1111111111111111111111111111)
    print(os.getpid(),"查看当前子进程号3")       # 1748   # os.getpid() 查看当前进程号
    print(os.getppid(),"查看父进程号4")          # 10120   os.getppid() 查看当前父进程号

if __name__ == "__main__":
    p1 = Process(target=run, args=(10, 20, 30))  # 实例化进程
    p1.start()
    print("我是主进程。。。。。")
    print(os.getpid(),"查看当前主进程号1")          #10120      # os.getpid() 查看当前进程号
    print(os.getppid(),"查看当前执行父进程号2")     #  7372  #   os.getppid() 查看当前父进程号
    print(p1.name)                                 # Process-1

print("*******************************************")

# 多进程
"""multiprocessing 库
是一个跨平台版本多进程模块 提供了一个Process类来代表一个进程对象
"""
from multiprocessing import Process
from time import sleep
import os
# 子进程需要执行代码
def run(str):
    print("子进程启动了")
    sleep(3)
    print("子进程结束")

if __name__ == "__main__":
    print("main是主(父)进程启动....")
    # 01 创建进程 子进程
    p=Process(target=run,args =("nice",))  #    target 说明进程执行的指定代码(函数)
    # 02  启动进程
    # 父进程的结束不能影响子进程 让父进程等待子进程结束在执行父进程
    p.start()
    print("父进程结束....")
    sleep(2)

# 执行顺序
#     main是主(父)进程启动....
#     父进程结束....
#     子进程启动了
#     子进程结束



4. 多进程启动(for循环 join进程感知)

def run(*args):
       print(args)
if __name__=="__main__":
    p1=Process(target=run,args=(10,20,30))      #  实例化进程
    p1.start()
    p2= Process(target=run, args=(555,666,777))
    p2.start()
    print("我是主进程")

print("***********************************************"def run(*args):
    print(args)
if __name__ == "__main__":
    for i in  range(5):
        p1 = Process(target=run, args=(10, 20, 30))  # 实例化进程
        p1.start()

print("***********************************************"
进程方法应用join 感知进程结束 相当于异步程序 改为同步程序 def run(*args): print(args) print("我是子进程2222222222222") time.sleep(2) print("我是子进程333333333333") if __name__ == "__main__": p1 = Process(target=run, args=(10, 20, 30)) # 实例化进程 p1.start() p1.join() # 应用join 感知进程结束 相当于异步程序 改为同步程序 有一个阻塞效果 就是子进程结束完了才执行主进程 print("我是主进程111111111111")
print("***********************************************"
join 进程方法应用感知 def run(): print("我是子进程2222222222222") time.sleep(2) # 在这休息两秒 但是程序没有停下 在休息2秒的过程中 他在执行上面打印4次 就没有等 在执行下面打印4次 print("我是子进程333333333333") if __name__ == "__main__": ret=[] for i in range(4): p1 = Process(target=run,) # 实例化进程 ret.append(p1) p1.start() for p in ret: p.join() #等待 上面所以进程必须执行完了才能执行下面代码 在join之前是异步执行 当遇到join有个阻塞效果就变成同步执行 print("完成了哈哈哈.......") # 就相当于在join之前 开辟了四条跑道 这四个进程在各自的跑道跑 但是这四个进程有跑的慢的和跑的快的(意思就是每个进程速度不一样) # 当遇到join 跑的快的进程要等跑的慢的进程(意思就是四个进程都跑完) 在计算成绩各自跑的成绩 就变成了同步
print("***********************************************"
join(进程方法应用join 感知进程结束 相当于异步程序 改为同步程序)
def run(filename,coon): with open(filename,"w") as f1: f1.write(coon*10*'*') if __name__ == "__main__": ret=[] for i in range(4): p1 = Process(target=run,args=("info%s"%i,i)) # 实例化进程 ret.append(p1) p1.start() print(ret) for p in ret:p.join() # 等待上面写完了在向用户展示所以写的文件 # 有个阻塞等待效果 join之前是异步写入 遇到join变同步写入完了 才能执行下面代码 print([i for i in os.walk(r"H:\python之路2\day1")]) # # 同步0.1=400 =40 # 异步 400 0.1 =0.1
print("***********************************************"
通个类继承来实现开启多个进程
class MyProcess(Process): def __init__(self,a,b): super().__init__() # 继承Proceess self.a=a self.b=b def run(self): print(self.a) print(self.b) print(self.pid) if __name__=="__main__": p1=MyProcess(1,2) p1.start() #相当于 # p2.run() 只是内部作为处理的 p2 = MyProcess(555, 99999) p2.start() # 相当于 # p2.run() 只是内部作为处理的 p1.join() p2.join() print("我是主进程!!!!!!!!!!!!!!1")




print("**********************************************************")


进程池概念 Pool
import os,time,random
from multiprocessing import Pool   # 多进程就用Pool  (进程池)


def run(name):

    print("子进程启动22222",name,getpid())
    print("子进程程结束2222",name,getpid())

if __name__ == '__main__':

    print("父进程启动11111")

    # 01 创建多个进程
    #     进程池
    #      表示可以同时执行进程数量
    #      Pool默认大小是cpu核心数
    pp=Pool(2)
    for i in range(5):
        # 02 创建进程 放入进程池统一管理
        pp.apply_async(run,args=(i,))
        
#apply 阻塞       同步方法
# apply_async  阻塞     异步方法

# 注意:在调用join之前必须先调用close 调用close之后就不能再继续添加新的进程了
pp.close()
#注意: 是进程池对象调用join会等待进程池中所有的子进程结束完毕再去执行父进程
pp.join()
print("父进程结束11111")
# 多进程
"""multiprocessing 库

是一个跨平台版本多进程模块 提供了一个Process类来代表一个进程对象
"""
from multiprocessing import Pool   #  使用多进程
import os ,time ,random

def run(name):
     print("子进程启动--%s"%(name,os.getpid()))
     # print(name,os.getpid())
     start=time.time()
     time.sleep(random.choiec([1,2,3]))
     end=time.time()
     print("子进程%d结束--%s--耗时%2.f"%(name,os.getpid(),end-start))
     # print(name,os.getpid(),end-start)
     # 
if __name__=="__main__":

  print("父进程启动......................")

   # 01创建多进程
   #    进程池
   # Pool()  默认是大小cpu核心数   
  pp=Pool(2)     #表示可以同时执行的进程数量

  for i in range(3):  # 创建3个进程
        # 创建进程,放入进程池统一管理
        pp.apply_async(run,args=(i,))

   # 在调用join之前必须先调用close调用close之后就不能再继续添加新的进程了
  pp.close()
   # 进程池对象调用join 会等待进程池中所有的子进程结束完毕再去执行父进程
  pp.join()
  print("父进程结束......................")

 5. 进程之间数据隔离

n=20
def run():
    global n
    n = 60
    print(n)  # 60
run()
print(n)   # 60


print("*********************************************")

num=100
# 说明子进程和父进程用的num是完全不同的   是两个完全不同的变量
def run():
     print("子进程开始")
     global num            #global 表示使用全局变量
     num+=1
     print(num)
     print("子进程结束")

if __name__=="__main__":
     print("父进程开始")
     p=Process(target=run)
     p.start()
     p.join()
     print("父进程结束---",num)
# 在子进程中修改全局变量对父进程中的全局变量没有影响
# 在创建子进程时对全局变量做了个备份  
# 在父进程和子进程中的num是两个完全不同的变量
# 执行顺序
#     父进程开始
#     子进程开始
#     101
#     子进程结束
#     父进程结束--- 100


print("*********************************************")

def run():
    global n
    n=1000
    print("子进程",os.getpid(), n) # 子进程 8016 1000

if __name__ == "__main__":
    n = 600
    p1 = Process(target=run)  # 实例化进程
    p1.start()
    p1.join()       # 阻塞等待子进程结束后才执行主进程
    print("主进程",os.getpid(),n)    # 主进程 5244 600


print("*********************************************")

    def work():
    global n
    n=222
    print('子进程内: ',n)  # 子进程内:  222
if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)  # 主进程内:  100

 6. 进程实现网络编程(注意子进程中不能加input)

server from socket import *
from multiprocessing import Process
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def run(conn,addr):
    print(addr)
    while True:
        msg=conn.recv(1024)
        conn.send(msg.upper())

if __name__ == '__main__':
    while True:
        conn,addr=server.accept()
        p=Process(target=run,args=(conn,addr))
        p.start()

print"*******************************************"client from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg=input('请输入字母: ').strip()
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

 7. 守护进程

会随着主进程的结束而结束。

主进程创建守护进程
  其一:守护进程会在主进程代码执行结束后就终止
  其二:守护进程内无法再开启子进程,否则抛出异常
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止。
import os
import time
from multiprocessing import Process
"""class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在吃饭' %self.person)

if __name__=="__main__":
    p = Myprocess('张三')
    p.daemon = True  # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    time.sleep(2)  # 在sleep时查看进程id对应的进程ps -ef|grep id
    print('主进程')


print("******************************************************")



def run1():
    while True:
        time.sleep( 0.2)
        print("我还活起的!!!!!!!!")

if __name__=="__main__":
    p1=Process(target=run1)
    p1.daemon = True # 设置守护进程 (主进程代码执行结束 子进程代码也会自然而然结束 )
    p1.start()            # 随着主进程代码结束完毕 的结束  而结束
    print(p1.is_alive())   #  判断进程是否或着
    i=0
    while i<5:
        print("我是主进程1111111111111111111")
        time.sleep(2)
        i+=1
    print(p1.is_alive())    #  判断进程是否或着


print("******************************************************")

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

p1=Process(target=foo)
p2=Process(target=bar)
if __name__=="__main__":
    p1.daemon=True
    p1.start()
    p2.start()
    time.sleep(0.1)
    print("main-------")
    # 打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务
    # 执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止.
    # main - ------
    # 456
    # end45

 8. 进程方法is_alive()  and    terminate()

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和网红脸聊天' %self.name)
        time.sleep(2)
        print('%s还在和网红脸聊天' %self.name)

if __name__ == '__main__':

    p1=Myprocess('顺悟空')
    p1.start()

    p1.terminate()#  结束一个子进程      关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive()) #结果为True     检查进程状态 是否活着

    print('开始')
    print(p1.is_alive()) #结果为False

    print("我是主进程哈哈哈哈哈")

# p1.terminate()   在主进程中结束一个子进程 结束一个子进程不是执行了立马生效 需要一个操作系统响应的过程