一文了解Python并发编程的工程实现方法

时间:2022-11-29 22:53:21

上一篇文章介绍了线程的使用。然而 python 中由于 global interpreter lock (全局解释锁 gil )的存在,每个线程在在执行时需要获取到这个 gil ,在同一时刻中只有一个线程得到解释锁的执行, python 中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。 如果要充分利用现代多核 cpu 的并发能力,就要使用 multipleprocessing 模块了。

0x01 multipleprocessing

与使用线程的 threading 模块类似, multipleprocessing 模块提供许多高级 api 。最常见的是 pool 对象了,使用它的接口能很方便地写出并发执行的代码。

?
1
2
3
4
5
6
7
8
9
from multiprocessing import pool
def f(x):
 return x * x
if __name__ == '__main__':
 with pool(5) as p:
  # map方法的作用是将f()方法并发地映射到列表中的每个元素
  print(p.map(f, [1, 2, 3]))
# 执行结果
# [1, 4, 9]

关于 pool 下文中还会提到,这里我们先来看 process 。

process

要创建一个进程可以使用 process 类,使用 start() 方法启动进程。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import process
import os
def echo(text):
 # 父进程id
 print("process parent id : ", os.getppid())
 # 进程id
 print("process pid : ", os.getpid())
 print('echo : ', text)
if __name__ == '__main__':
 p = process(target=echo, args=('hello process',))
 p.start()
 p.join()
# 执行结果
# process parent id : 27382
# process pid : 27383
# echo : hello process

进程池

正如开篇提到的 multiprocessing 模块提供了 pool 类可以很方便地实现一些简单多进程场景。 它主要有以下接口

  • apply(func[, args[, kwds]])
  • 执行 func(args,kwds) 方法,在方法结束返回前会阻塞。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
  • 异步执行 func(args,kwds) ,会立即返回一个 result 对象,如果指定了 callback 参数,结果会通过回调方法返回,还可以指定执行出错的回调方法 error_callback()
  • map(func, iterable[, chunksize])
  • 类似内置函数 map() ,可以并发执行 func ,是同步方法
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
  • 异步版本的 map
  • close()
  • 关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
  • terminate()
  • 终止进程池
  • join()
  • 等待工作进程执行完,必需先调用 close() 或者 terminate()
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import pool
def f(x):
 return x * x
if __name__ == '__main__':
 with pool(5) as p:
  # map方法的作用是将f()方法并发地映射到列表中的每个元素
  a = p.map(f, [1, 2, 3])
  print(a)
  # 异步执行map
  b = p.map_async(f, [3, 5, 7, 11])
  # b 是一个result对象,代表方法的执行结果
  print(b)
  # 为了拿到结果,使用join方法等待池中工作进程退出
  p.close()
  # 调用join方法前,需先执行close或terminate方法
  p.join()
  # 获取执行结果
  print(b.get())
# 执行结果
# [1, 4, 9]
# <multiprocessing.pool.mapresult object at 0x10631b710>
# [9, 25, 49, 121]

map_async() 和 apply_async() 执行后会返回一个 class multiprocessing.pool.asyncresult 对象,通过它的 get() 可以获取到执行结果, ready() 可以判断 asyncresult 的结果是否准备好。

进程间数据的传输

multiprocessing 模块提供了两种方式用于进程间的数据共享:队列( queue )和管道( pipe )

queue 是线程安全,也是进程安全的。使用 queue 可以实现进程间的数据共享,例如下面的 demo 中子进程 put 一个对象,在主进程中就能 get 到这个对象。 任何可以序列化的对象都可以通过 queue 来传输。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import process, queue
def f(q):
 q.put([42, none, 'hello'])
if __name__ == '__main__':
 # 使用queue进行数据通信
 q = queue()
 p = process(target=f, args=(q,))
 p.start()
 # 主进程取得子进程中的数据
 print(q.get()) # prints "[42, none, 'hello']"
 p.join()
# 执行结果
# [42, none, 'hello']

pipe() 返回一对通过管道连接的 connection 对象。这两个对象可以理解为管道的两端,它们通过 send() 和 recv() 发送和接收数据。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import process, pipe
def write(conn):
 # 子进程中发送一个对象
 conn.send([42, none, 'hello'])
 conn.close()
def read(conn):
 # 在读的进程中通过recv接收对象
 data = conn.recv()
 print(data)
if __name__ == '__main__':
 # pipe()方法返回一对连接对象
 w_conn, r_conn = pipe()
 wp = process(target=write, args=(w_conn,))
 rp = process(target=read, args=(r_conn,))
 wp.start()
 rp.start()
# 执行结果
# [42, none, 'hello']

需要注意的是,两个进程不能同时对一个连接对象进行 send 或 recv 操作。

同步

我们知道线程间的同步是通过锁机制来实现的,进程也一样。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import process, lock
import time
def print_with_lock(l, i):
 l.acquire()
 try:
  time.sleep(1)
  print('hello world', i)
 finally:
  l.release()
def print_without_lock(i):
 time.sleep(1)
 print('hello world', i)
if __name__ == '__main__':
 lock = lock()
 # 先执行有锁的
 for num in range(5):
  process(target=print_with_lock, args=(lock, num)).start()
 # 再执行无锁的
 # for num in range(5):
 #  process(target=print_without_lock, args=(num,)).start()

有锁的代码将每秒依次打印

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4

如果执行无锁的代码,则在我的电脑上执行结果是这样的

hello worldhello world  0
1
hello world 2
hello world 3
hello world 4

除了 lock ,还包括 rlock 、 condition 、 semaphore 和 event 等进程间的同步原语。其用法也与线程间的同步原语很类似。 api 使用可以参考文末中引用的文档链接。

在工程中实现进程间的数据共享应当优先使用 队列或管道。

0x02 总结

本文对 multiprocessing 模块中常见的 api 作了简单的介绍。讲述了 process 和 pool 的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。

原文链接:https://juejin.im/post/5cefdc60f265da1bca51c0cf