前言:
python多进程,经常在使用,却没有怎么系统的学习过,官网上面讲得比较细,结合自己的学习,整理记录下官网:https://docs.python.org/3/library/multiprocessing.html
multiprocessing简介
multiprocessing是python自带的多进程模块,可以大批量的生成进程,在服务器为多核cpu时效果更好,类似于threading模块。相对于多线程,多进程由于独享内存空间,更稳定安全,在运维里面做些批量操作时,多进程有更多适用的场景
multiprocessing包提供了本地和远程两种并发操作,有效的避开了使用子进程而不是全局解释锁的线程,因此,multiprocessing可以有效利用到多核处理
process类
在multiporcessing中,通过process类对象来批量产生进程,使用start()方法来启动这个进程
1.语法
multiprocessing.process(group=none,target=none,name=none,args=(),kwargs={},*)
- group: 这个参数一般为空,它只是为了兼容threading.tread
- target: 这个参数就是通过run()可调用对象的方法,默认为空,表示没有方法被调用
- name: 表示进程名
- args: 传给target调用方法的tuple(元组)参数
- kwargs: 传给target调用方法的dict(字典)参数
2.process类的方法及对象
run()
该方法是进程的运行过程,可以在子类中重写此方法,一般也很少去重构
start()
启动进程,每个进程对象都必须被该方法调用
join([timeout])
等待进程终止,再往下执行,可以设置超时时间
name
可以获取进程名字,多个进程也可以是相同的名字
is_alive()
返回进程是否还存活,true or false,进程存活是指start()开始到子进程终止
daemon
守护进程的标记,一个布尔值,在start()之后设置该值,表示是否后台运行
注意:如果设置了后台运行,那么后台程序不运行再创建子进程
pid
可以获取进程id
exitcode
子进程退出时的值,如果进程还没有终止,值将是none,如果是负值,表示子进程被终止
terminate()
终止进程,如果是windows,则使用terminateprocess(),该方法对已经退出和结束的进程,将不会执行
以下为一个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#-*- coding:utf8 -*-
import multiprocessing
import time
def work(x):
time.sleep( 1 )
print time.ctime(), '这是子进程[{0}]...' . format (x)
if __name__ = = '__main__' :
for i in range ( 5 ):
p = multiprocessing.process(target = work,args = (i,))
print '启动进程数:{0}' . format (i)
p.start()
p.deamon = true
|
当然也可以显示每个进程的id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
#-*- coding:utf8 -*-
import multiprocessing
import time
import os
def work(x):
time.sleep( 1 )
ppid = os.getppid()
pid = os.getpid()
print time.ctime(), '这是子进程[{0},父进程:{1},子进程:{2}]...' . format (x,ppid,pid)
if __name__ = = '__main__' :
for i in range ( 5 ):
p = multiprocessing.process(target = work,args = (i,))
print '启动进程数:{0}' . format (i)
p.start()
p.deamon = true
|
但在实际使用的过程中,并不只是并发完就可以了,比如,有30个任务,由于服务器资源有限,每次并发5个任务,这里还涉及到30个任务怎么获取的问题,另外并发的进程任务执行时间很难保证一致,尤其是需要时间的任务,可能并发5个任务,有3个已经执行完了,2个还需要很长时间执行,总不能等到这两个进程执行完了,再继续执行后面的任务,因此进程控制就在此有了使用场景,可以利用process的方法和一些multiprocessing的包,类等结合使用
进程控制及通信常用类
一、queue类
类似于python自带的queue.queue,主要用在比较小的队列上面
语法:
multiprocessing.queue([maxsize])
类方法:
qsize()
返回队列的大致大小,因为多进程或者多线程一直在消耗队列,因此该数据不一定正确
empty()
判断队列是否为空,如果是,则返回true,否则false
full()
判断队列是否已满,如果是,则返回true,否则false
put(obj[, block[, timeout]])
将对象放入队列,可选参数block为true,timeout为none
get()
从队列取出对象
1
2
3
4
5
6
7
8
9
10
11
12
|
#-*- coding:utf8 -*-
from multiprocessing import process, queue
def f(q):
q.put([ 42 ,none, 'hi' ])
if __name__ = = '__main__' :
q = queue()
p = process(target = f, args = (q,))
p.start()
print q.get() #打印内容: [42,none,'hi']
p.join()
|
二、pipe类
pipe()函数返回一对对象的连接,可以为进程间传输消息,在打印一些日志、进程控制上面有一些用处,pip()对象返回两个对象connection,代表两个通道,每个connection对象都有send()和recv()方法,需要注意的是两个或以上的进程同时读取或者写入同一管道,可能会导致数据混乱,测试了下,是直接覆盖了。另外,返回的两个connection,如果一个是send()数据,那么另外一个就只能recv()接收数据了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
#-*- coding:utf8 -*-
from multiprocessing import process, pipe
import time
def f(conn,i):
print '[{0}]已经执行到子进程:{1}' . format (time.ctime(),i)
time.sleep( 1 )
w = "[{0}]hi,this is :{1}" . format (time.ctime(),i)
conn.send(w)
conn.close()
if __name__ = = '__main__' :
reader = []
parent_conn, child_conn = pipe()
for i in range ( 4 ):
p = process(target = f, args = (child_conn,i))
p.start()
reader.append(parent_conn)
p.deamon = true
# 等待所有子进程跑完
time.sleep( 3 )
print '\n[{0}]下面打印child_conn向parent_conn传输的信息:' . format (time.ctime())
for i in reader:
print i.recv()
|
输出为:
三、value,array
在进行并发编程时,应尽量避免使用共享状态,因为多进程同时修改数据会导致数据破坏。但如果确实需要在多进程间共享数据,multiprocessing也提供了方法value、array
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from multiprocessing import process, value, array
def f(n, a):
n.value = 3.1415927
for i in range ( len (a)):
a[i] = - a[i]
if __name__ = = '__main__' :
num = value( 'd' , 0.0 )
arr = array( 'i' , range ( 10 ))
p = process(target = f, args = (num, arr))
p.start()
p.join()
print num.value
print arr[:]
|
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]*
四、manager进程管理模块
manager类管理进程使用得较多,它返回对象可以操控子进程,并且支持很多类型的操作,如: list, dict, namespace、lock, rlock, semaphore, boundedsemaphore, condition, event, barrier, queue, value, array,因此使用manager基本上就够了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from multiprocessing import process, manager
def f(d, l):
d[ 1 ] = '1'
d[ '2' ] = 2
d[ 0.25 ] = none
l.reverse()
if __name__ = = '__main__' :
with manager() as manager:
d = manager. dict ()
l = manager. list ( range ( 10 ))
p = process(target = f, args = (d, l))
p.start()
p.join() #等待进程结束后往下执行
print d, '\n' ,l
|
输出:
{0.25: none, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
可以看到,跟共享数据一样的效果,大部分管理进程的方法都集成到了manager()模块了
五、对多进程控制的应用实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#-*- coding:utf8 -*-
from multiprocessing import process, queue
import time
def work(pname,q):
time.sleep( 1 )
print_some = "{0}|this is process: {1}" . format (time.ctime(),pname)
print print_some
q.put(pname)
if __name__ = = '__main__' :
p_manag_num = 2 # 进程并发控制数量2
# 并发的进程名
q_process = [ 'process_1' , 'process_2' , 'process_3' , 'process_4' , 'process_5' ]
q_a = queue() # 将进程名放入队列
q_b = queue() # 将q_a的进程名放往q_b进程,由子进程完成
for i in q_process:
q_a.put(i)
p_list = [] # 完成的进程队列
while not q_a.empty():
if len (p_list) < = 2 :
pname = q_a.get()
p = process(target = work, args = (pname,q_b))
p.start()
p_list.append(p)
print pname
for p in p_list:
if not p.is_alive():
p_list.remove(p)
# 等待5秒,预估执行完后看队列通信信息
# 当然也可以循环判断队列里面的进程是否执行完成
time.sleep( 5 )
print '打印p_b队列:'
while not q_b.empty():
print q_b.get()
|
执行结果:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000016855803