上一篇文章介绍了线程的使用。然而 python 中由于 global interpreter lock
(全局解释锁 gil )的存在,每个线程在在执行时需要获取到这个 gil ,在同一时刻中只有一个线程得到解释锁的执行, python 中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。 如果要充分利用现代多核 cpu 的并发能力,就要使用 multipleprocessing 模块了。
0x01 multipleprocessing
与使用线程的 threading 模块类似, multipleprocessing
模块提供许多高级 api 。最常见的是 pool 对象了,使用它的接口能很方便地写出并发执行的代码。
1
2
3
4
5
6
7
8
9
|
from multiprocessing import pool
def f(x):
return x * x
if __name__ = = '__main__' :
with pool( 5 ) as p:
# map方法的作用是将f()方法并发地映射到列表中的每个元素
print (p. map (f, [ 1 , 2 , 3 ]))
# 执行结果
# [1, 4, 9]
|
关于 pool 下文中还会提到,这里我们先来看 process 。
process
要创建一个进程可以使用 process 类,使用 start() 方法启动进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from multiprocessing import process
import os
def echo(text):
# 父进程id
print ( "process parent id : " , os.getppid())
# 进程id
print ( "process pid : " , os.getpid())
print ( 'echo : ' , text)
if __name__ = = '__main__' :
p = process(target = echo, args = ( 'hello process' ,))
p.start()
p.join()
# 执行结果
# process parent id : 27382
# process pid : 27383
# echo : hello process
|
进程池
正如开篇提到的 multiprocessing
模块提供了 pool 类可以很方便地实现一些简单多进程场景。 它主要有以下接口
- apply(func[, args[, kwds]])
- 执行 func(args,kwds) 方法,在方法结束返回前会阻塞。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 异步执行 func(args,kwds) ,会立即返回一个 result 对象,如果指定了 callback 参数,结果会通过回调方法返回,还可以指定执行出错的回调方法 error_callback()
- map(func, iterable[, chunksize])
- 类似内置函数 map() ,可以并发执行 func ,是同步方法
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 异步版本的 map
- close()
- 关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
- terminate()
- 终止进程池
- join()
- 等待工作进程执行完,必需先调用 close() 或者 terminate()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from multiprocessing import pool
def f(x):
return x * x
if __name__ = = '__main__' :
with pool( 5 ) as p:
# map方法的作用是将f()方法并发地映射到列表中的每个元素
a = p. map (f, [ 1 , 2 , 3 ])
print (a)
# 异步执行map
b = p.map_async(f, [ 3 , 5 , 7 , 11 ])
# b 是一个result对象,代表方法的执行结果
print (b)
# 为了拿到结果,使用join方法等待池中工作进程退出
p.close()
# 调用join方法前,需先执行close或terminate方法
p.join()
# 获取执行结果
print (b.get())
# 执行结果
# [1, 4, 9]
# <multiprocessing.pool.mapresult object at 0x10631b710>
# [9, 25, 49, 121]
|
map_async() 和 apply_async() 执行后会返回一个 class multiprocessing.pool.asyncresult 对象,通过它的 get() 可以获取到执行结果, ready() 可以判断 asyncresult 的结果是否准备好。
进程间数据的传输
multiprocessing 模块提供了两种方式用于进程间的数据共享:队列( queue )和管道( pipe )
queue 是线程安全,也是进程安全的。使用 queue 可以实现进程间的数据共享,例如下面的 demo 中子进程 put 一个对象,在主进程中就能 get 到这个对象。 任何可以序列化的对象都可以通过 queue 来传输。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from multiprocessing import process, queue
def f(q):
q.put([ 42 , none, 'hello' ])
if __name__ = = '__main__' :
# 使用queue进行数据通信
q = queue()
p = process(target = f, args = (q,))
p.start()
# 主进程取得子进程中的数据
print (q.get()) # prints "[42, none, 'hello']"
p.join()
# 执行结果
# [42, none, 'hello']
|
pipe() 返回一对通过管道连接的 connection 对象。这两个对象可以理解为管道的两端,它们通过 send() 和 recv() 发送和接收数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from multiprocessing import process, pipe
def write(conn):
# 子进程中发送一个对象
conn.send([ 42 , none, 'hello' ])
conn.close()
def read(conn):
# 在读的进程中通过recv接收对象
data = conn.recv()
print (data)
if __name__ = = '__main__' :
# pipe()方法返回一对连接对象
w_conn, r_conn = pipe()
wp = process(target = write, args = (w_conn,))
rp = process(target = read, args = (r_conn,))
wp.start()
rp.start()
# 执行结果
# [42, none, 'hello']
|
需要注意的是,两个进程不能同时对一个连接对象进行 send 或 recv 操作。
同步
我们知道线程间的同步是通过锁机制来实现的,进程也一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from multiprocessing import process, lock
import time
def print_with_lock(l, i):
l.acquire()
try :
time.sleep( 1 )
print ( 'hello world' , i)
finally :
l.release()
def print_without_lock(i):
time.sleep( 1 )
print ( 'hello world' , i)
if __name__ = = '__main__' :
lock = lock()
# 先执行有锁的
for num in range ( 5 ):
process(target = print_with_lock, args = (lock, num)).start()
# 再执行无锁的
# for num in range(5):
# process(target=print_without_lock, args=(num,)).start()
|
有锁的代码将每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果执行无锁的代码,则在我的电脑上执行结果是这样的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
除了 lock ,还包括 rlock 、 condition 、 semaphore 和 event 等进程间的同步原语。其用法也与线程间的同步原语很类似。 api 使用可以参考文末中引用的文档链接。
在工程中实现进程间的数据共享应当优先使用 队列或管道。
0x02 总结
本文对 multiprocessing 模块中常见的 api 作了简单的介绍。讲述了 process 和 pool 的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。
原文链接:https://juejin.im/post/5cefdc60f265da1bca51c0cf