Python 多进程编程
Python的解释器默认是线程安全的,其主要措施是一个叫做GIL(Global Interpreter Lock)的机制。
GIL机制来保证对Python对象的引用计数的操作都是原子操作,从而不会因为多线程造成引用对象的内存泄漏和错误引用。但是这个机制也就限制了多线程的性能。因为GIL的存在,Python一次只能有一个线程在运行。
所以多线程不能提高程序性能,除非你有大量的I/O操作(例如:读写文件,网络通信等)。
另外一种提高性能,利用多CPU的方案就是多进程。
这里介绍如何使用Python库multiprocessing
,生成多进程来利用多核CPU。
多进程库 multiprocessing
multiprocessing 是Python的标准库之一,官方文档参考multiprocessing — Process-based parallelism.
我们主要使用它的2个类来产生子进程:
- Process 进程类
- Pool 进程池类
进程类 Process
Process类可以单独调用,也可以继承使用。主要操作如下:
- Process.start() 启动子进程
- Process.join() 等待子进程结束(阻塞等待)
- Process.is_alive() 判断子进程是否在运行(运行中返回True)
- Process.close() 结束子进程(不推荐使用,会抛出异常)
这里以调用为例:
#!/usr/bin/python
from __future__ import print_function # at top of module
from __future__ import division, unicode_literals, with_statement
import time
import os
from multiprocessing import Process
def task_func(timeout):
name = 'task[%d]' % os.getpid()
print('%s start' % name)
for i in range(timeout):
time.sleep(1)
print('%s waiting(%d/%d)' % (name, i, timeout))
print('%s end' % name)
def main():
print('main start')
# 创建子进程, 来运行task_func
p1 = Process(target=task_func, args=(5,))
p1.start()
# 父进程可以进行其他处理
for i in range(3):
print('do something %d' % i)
time.sleep(1)
# 等待子进程结束
p1.join()
print('main end')
if __name__ == '__main__':
main()
输出如下
> ./mutliproc.py
main start
do something 0
task[30280] start
do something 1
task[30280] waiting(0/5)
do something 2
task[30280] waiting(1/5)
task[30280] waiting(2/5)
task[30280] waiting(3/5)
task[30280] waiting(4/5)
task[30280] end
main end
这里还可以利用队列Queue或者管道Pipe来进行父进程和子进程之间的数据交换。
下面义队列Queue为例,进程单向数据传输:
#!/usr/bin/python
from __future__ import print_function # at top of module
from __future__ import division, unicode_literals, with_statement
import time
import os
from multiprocessing import Queue, Process
def task_func(queue, timeout):
name = 'task[%d]' % os.getpid()
print('%s start' % name)
for i in range(timeout):
time.sleep(1)
# 数据发送到队列, 不仅可以发送字符串,还可以发送对象
data = {
'task': name,
'progress': i,
'timeout': timeout,
}
queue.put(data)
print('%s end' % name)
def main():
print('main start')
# 父进程创建队列
q = Queue()
# 创建子进程, 队列已参数的形式传给子进程
p1 = Process(target=task_func, args=(q, 5))
p1.start()
# 父进程可以进行其他处理
for i in range(3):
print('do something %d' % i)
time.sleep(1)
# 等待子进程结束
p1.join()
# 读取所有数据
while not q.empty():
print(q.get())
print('main end')
if __name__ == '__main__':
main()
需要数据双向交换的,可以用Pipe类。由于本人用不到,这里贴出官方的例子:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
进程池类 Pool
使用进程池的好处就是可以,创建有限的进程来自动并行处理大量的任务。
Pool类的主要方法:
- Pool.apply() 启动任务(阻塞等待任务完成)
- Pool.map() 批量启动任务(阻塞等待所有任务完成)
- Pool.apply_async() 启动任务(非阻塞)
- Pool.map_async() 批量启动任务(非阻塞)
这里以最常用的非阻塞为例
#!/usr/bin/python
from __future__ import print_function # at top of module
from __future__ import division, unicode_literals, with_statement
import time
import os
from multiprocessing import Pool
def task_func(timeout):
name = 'task[%d]' % os.getpid()
print('%s start' % name)
for i in range(timeout):
time.sleep(1)
print('%s waiting(%d/%d)' % (name, i, timeout))
print('%s end' % name)
# 通过return返回数据
return {'task': name, 'timeout': timeout}
def main():
print('main start')
# 创建进程池, 允许同时运行4个进程
pool = Pool(4)
# 创建6个任务, 并全部非阻塞启动(实际只有4个开始运行, 另外2个在等待)
ret = [pool.apply_async(task_func, args=(i,)) for i in range(6)]
# 读取每个任务的返回结果
for each in ret:
print(each.get())
print('main end')
if __name__ == '__main__':
main()
注意:
1. Queue不能通过参数的方式,传给由进程池启动的子进程
2. 由进程池启动的子进程可以通过return来返回结果数据
总结
- 对于不需要进行数据交换,只看结果的任务,推荐使用进程池
- 尽量将共享资源通过参数的形式传入子进程(保持Linux和Windows行为的一致)