51.[Python]使用multiprocessing进行多进程编程

时间:2022-05-28 04:10:59

转载请注明原始出处:http://blog.csdn.net/a464057216/article/details/52735584

基本知识

本文先介绍使用Python进行多进程编程需要了解的基础知识,然后以multiprocessing包为例说明如何进行多进程编程,本文的源码放在我的github项目上,欢迎访问。针对IO密集型任务,如果想采用多线程方式处理,欢迎阅读我的下一篇博客:[Python]使用threading进行多线程编程

多道程序(multiprogramming):CPU读取多个程序放入内存,运行第一个程序直到它出现IO操作(或运行结束),切换到第二个程序。每个程序的时间分配不均等,可能第一个程序运行很久也没有IO操作或结束,第二个程序一直得不到运行。

多任务处理(multitasking):运行第一个程序一段时间,保存工作环境,切换到第二个程序运行一段时间,保存工作环境;恢复第一个程序的工作环境,继续运行第一个程序,如此往复,每个程序的运行时间相对平均。

分时系统(time sharing):利用多道程序或多任务处理技术,计算机处理多个用户指令时,将运行时间分成多个片段分配给每个用户指定的任务,轮流执行所有任务直至完成。

进程(process):程序是指令和数据的有序集合,是一个静态的概念,只有处理器调度它时才成为一个活动的实体,称为进程。进程是动态产生,动态消亡的。进程由进程控制块、程序段、数据段组成。进程控制块(Process Control Block,PCB)是操作系统中主要表示进程状态等信息的一种数据结构,是系统感知进程存在的唯一标识。

进程切换,是进程把放在处理器寄存器的中间数据(也叫上下文)放回私有堆栈,让下一个待运行进程占用处理器,下一个进程将自己的中间数据加载到处理器寄存器,并从上一次的断点继续执行的过程。

进程状态的切换:
51.[Python]使用multiprocessing进行多进程编程

对于Linux系统,系统开机时建立的init进程的pid为1,其他所有进程都由init进程(或其子进程)fork而来,所有进程以init进程为根构成一个进程树(可以通过pstree命令查看)。fork进程的时候,系统在内存中开辟一段新空间给新进程,然后两个进程同时运行。fork函数与普通函数不同,有两次返回,将子进程的PID返回给父进程,将0返回给子进程,通常调用fork()后,会设计一个if结构,如果返回值为0,说明处于子进程,编码处理子进程工作,如果返回值不为0,说明处于父进程,编码处理父进程工作。

Python由于全局锁GIL的存在,无法享受多线程带来的性能提升。multiprocessing包采用子进程的技术避开了GIL,使用multiprocessing可以进行多进程编程提高程序效率。

Process对象

multiprocessing.Process对象是对进程的抽象,比如:

# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的博客

from multiprocessing import Process
import os

def get_process(info):
print info
# *nix系统才有getpid及getppid方法
print 'Process ID:', os.getpid()
print 'Parent process ID:', os.getppid()

def func(name):
get_process('In func:')
print "Hello,", name

if __name__ == "__main__":
get_process('In main:')
p = Process(target=func, args=('marsloo',))
# 开始子进程
p.start()
# 等待子进程结束
p.join()

Process对象的初始化参数为Process(group=None, target=None, name=None, args=(), kwargs={}),其中group参数必须为None(为了与threading.Thread的兼容),target指向可调用对象(该对象在新的子进程中运行),name是为该子进程命的名字(默认是Proess-1,Process-2, …这样),args是被调用对象的位置参数的元组列表,kwargs是被调用对象的关键字参数。

子进程终结时会通知父进程并清空自己所占据的内存,在内核里留下退出信息(exit code,如果顺利运行,为0;如果有错误或异常状况,为大于零的整数)。父进程得知子进程终结后,需要对子进程使用wait系统调用,wait函数会从内核中取出子进程的退出信息,并清空该信息在内核中占据的空间。
如果父进程早于子进程终结,子进程变成孤儿进程,孤儿进程会被过继给init进程,init进程就成了该子进程的父进程,由init进程负责该子进程终结时调用wait函数。如果父进程不对子进程调用wait函数,子进程成为僵尸进程。僵尸进程积累时,会消耗大量内存空间。

如果在父进程中不调用p.join方法,则主进程与父进程并行工作:

from multiprocessing import Process
import time

def func():
print "Child process start, %s" % time.ctime()
time.sleep(2)
print "Child process end, %s" % time.ctime()


if __name__ == "__main__":
print "Parent process start, %s" % time.ctime()
p = Process(target=func)
p.start()
# p.join()
time.sleep(1)
print "Parent process end, %s" % time.ctime()

上述代码的输出为:

Parent process start, Thu Oct  6 10:59:37 2016
Child process start, Thu Oct 6 10:59:37 2016
Parent process end, Thu Oct 6 10:59:38 2016
Child process end, Thu Oct 6 10:59:39 2016

如果开启了p.join的调用,输出如下:

Parent process start, Thu Oct  6 11:00:39 2016
Child process start, Thu Oct 6 11:00:39 2016
Child process end, Thu Oct 6 11:00:41 2016
Parent process end, Thu Oct 6 11:00:42 2016

读者可以体会一下join方法的作用。另外一种情况是将子进程设置为守护进程,则父进程在退出时不会关注子进程是否结束而直接退出:

from multiprocessing import Process
import time

def func():
print "Child process start, %s" % time.ctime()
time.sleep(2)
print "Child process end, %s" % time.ctime()


if __name__ == "__main__":
print "Parent process start, %s" % time.ctime()
p = Process(target=func)
# 守护进程一定要在start方法调用之前设置
p.daemon = True
p.start()
# p.join()
time.sleep(1)
print "Parent process end, %s" % time.ctime()

上述代码输出为:

Parent process start, Thu Oct  6 11:02:12 2016
Child process start, Thu Oct 6 11:02:12 2016
Parent process end, Thu Oct 6 11:02:13 2016

当然,如果开启主进程对join方法的调用,主进程还是会等待守护子进程结束:

Parent process start, Thu Oct  6 11:04:00 2016
Child process start, Thu Oct 6 11:04:00 2016
Child process end, Thu Oct 6 11:04:02 2016
Parent process end, Thu Oct 6 11:04:03 2016

进程间通信

Interprocess communication,简称IPC。

Queue

使用Queue对象可以实现进程间通信,并且Queue对象是线程及进程安全的:

# Written by CSDN: Mars Loo的博客
from multiprocessing import Queue, Process

def func(q):
q.put([1, 'str', None])

if __name__ == "__main__":
q = Queue()
p = Process(target=func, args=(q,))
p.start()
p.join()
print q.get()

如果声明了q = Queue(n)的Queue对象,则该对象的容量为n

Pipe

Pipe对象返回的元组分别代表管道的两端,管道默认是全双工,两端都支持sendrecv方法,两个进程分别操作管道两端时不会有冲突,两个进程对管道一端同时读写时可能会有冲突:

# Written by CSDN: Mars Loo的博客
from multiprocessing import Pipe, Process

def func(p):
p.send([1, 'str', None])
p.close()

if __name__ == "__main__":
parent_side, child_side = Pipe()
p = Process(target=func, args=(child_side,))
p.start()
print parent_side.recv()
p.join()

如果声明了p = Pipe(duplex=False)的单向管道,则p[0]只负责接受消息,p[1]只负责发送消息。

防止访问冲突与共享状态

比如多进程向stdout打印时,为了防止屏幕内容混乱可以加锁处理:

# Written by CSDN: Mars Loo的博客
from multiprocessing import Lock, Process

def lock_func(l, number):
l.acquire()
print "Number is: %d" % number
l.release()

if __name__ == "__main__":
l = Lock()
for number in range(10):
Process(target=lock_func, args=(l, number,)).start()

并发编程中,应该尽量避免进程或线程间共享状态。在多线程中,共享资源可以使用全局变量或者传递参数。在多进程中,由于每个进程有自己独立的内存空间,以上方法并不合适,比如:

# Written by CSDN: Mars Loo的博客
from multiprocessing import Process

a = [2]

def func(i):
global a
a[i] += 1
print "a[0] in child prcocess:", a[0]

if __name__ == "__main__":
p = Process(target=func, args=(0, ))
p.start()
p.join()
print "a[0] in parent process:", a[0]

上述代码的运行结果为:

a[0] in child prcocess: 3
a[0] in parent process: 2

在多进程间共享状态可以使用共享内存或服务进程。需要注意的是,使用共享内存和服务进程需要加锁,否则多进程同时读写数据时会发生冲突。

共享内存

在进程间共享状态可以使用multiprocessing.Valuemultiprocessing.Array这样特殊的共享内存对象:

# Written by CSDN: Mars Loo的博客
from multiprocessing import Process, Value, Array

def func(n, a):
n.value = 3.1415926
for i in range(len(a)):
a[i] = -i

if __name__ == "__main__":
# 'd'表示浮点型数据,'i'表示整数
n = Value('d', 0.0)
a = Array('i', range(10))
p = Process(target=func, args=(n, a,))
p.start()
p.join()

print n.value
print a[:]

服务进程

multiprocessing.Manager对象像是一个保存状态的代理,其他进程通过与代理的接口通信取得状态信息,服务进程支持更多的数据类型,使用起来比共享内存更灵活。

# Written by CSDN: Mars Loo的博客
from multiprocessing import Process, Manager

def func(d, l):
d['1'] = 2
d[2] = 'str'
d[3.0] = None
for i in range(len(l)):
l[i] = -i

if __name__ == "__main__":
m = Manager()
l = m.list(range(10))
d = m.dict()
p = Process(target=func, args=(d, l,))
p.start()
p.join()

print d
print l

进程池

针对任务量巨大的场景,可以将进程放入进程池中,每个进程可以看做是一个worker,比如:

from multiprocessing import Pool, TimeoutError
import os
import time

def func(x):
return x * x

if __name__ == "__main__":
p = Pool(processes=4)

print p.map(func, range(10))

for r in p.imap_unordered(func, range(10)):
print r,
print

# 异步执行
res = p.apply_async(func, (20,))
print res.get(timeout=1)

res = p.apply_async(os.getpid, ())
print res.get(timeout=1)

resutls = [p.apply_async(os.getpid, ()) for i in range(4)]
print [r.get(timeout=1) for r in resutls]

res = p.apply_async(time.sleep, (3,))
try:
print res.get(timeout=1)
except TimeoutError:
print "Timeout error!"

使用multiprocessing进行并发测试

请访问我的github项目,分别举了使用Process对象和进程池进行并发测试的例子。

如果觉得我的文章对您有帮助,欢迎关注我(CSDN:Mars Loo的博客)或者为这篇文章点赞,谢谢