以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称好了),觉得去折腾并行没啥意义(除非在做IO密集型任务)。然后自从用上了32核128GB内存,看到 htop 里面一堆空载的核,很自然地就会想这个并行必须去折腾一下。后面发现,其实 Python 的并行真的非常简单。

【Python】【并行计算】Python 多核并行计算

multiprocessing vs threading

Python 自带的库又全又好用,这是我特别喜欢 Python 的原因之一。Python 里面有 multiprocessing和 threading 这两个用来实现并行的库。用线程应该是很自然的想法,毕竟(直觉上)开销小,还有共享内存的福利,而且在其他语言里面线程用的确实是非常频繁。然而,我可以很负责任的说,如果你用的是 CPython 实现,那么用了 threading 就等同于和并行计算说再见了(实际上,甚至会比单线程更慢),除非这是个IO密集型的任务。

GIL

CPython 指的是 python.org 提供的 Python 实现。是的,Python 是一门语言,它有各种不同的实现,比如 PyPyJythonIronPython 等等……我们用的最多的就是 CPython,它几乎就和 Python 画上了等号。

CPython 的实现中,使用了 GIL 即全局锁,来简化解释器的实现,使得解释器每次只执行一个线程中的字节码。也就是说,除非是在等待IO操作,否则 CPython 的多线程就是彻底的谎言!

有关 GIL 下面两个资料写的挺好的:

multiprocessing.Pool

因为 GIL 的缘故 threading 不能用,那么我们就好好研究研究 multiprocessing。(当然,如果你说你不用 CPython,没有 GIL 的问题,那也是极佳的。)

首先介绍一个简单粗暴,非常实用的工具,就是 multiprocessing.Pool。如果你的任务能用 ys = map(f, xs) 来解决,大家可能都知道,这样的形式天生就是最容易并行的,那么在 Python 里面并行计算这个任务真是再简单不过了。举个例子,把每个数都平方:

import multiprocessingdef f(x):    return x * xcores = multiprocessing.cpu_count()pool = multiprocessing.Pool(processes=cores)xs = range(5)# method 1: mapprint pool.map(f, xs)  # prints [0, 1, 4, 9, 16]# method 2: imapfor y in pool.imap(f, xs):    print y            # 0, 1, 4, 9, 16, respectively# method 3: imap_unorderedfor y in pool.imap_unordered(f, xs):    print(y)           # may be in any order

map 直接返回列表,而 i 开头的两个函数返回的是迭代器;imap_unordered 返回的是无序的。

当计算时间比较长的时候,我们可能想要加上一个进度条,这个时候 i 系列的好处就体现出来了。另外,有一个小技巧,就是输出 \r 可以使得光标回到行首而不换行,这样就可以制作简易的进度条了。

cnt = 0for _ in pool.imap_unordered(f, xs):    sys.stdout.write('done %d/%d\r' % (cnt, len(xs)))    cnt += 1

更复杂的操作

要进行更复杂的操作,可以直接使用 multiprocessing.Process 对象。要在进程间通信可以使用:

其中我强烈推荐的就是 Queue,因为其实很多场景就是生产者消费者模型,这个时候用 Queue 就解决问题了。用的方法也很简单,现在父进程创建 Queue,然后把它当做 args 或者 kwargs 传给 Process 就好了。

使用 Theano 或者 Tensorflow 等工具时的注意事项

需要注意的是,在 import theano 或者 import tensorflow 等调用了 Cuda 的工具的时候会产生一些副作用,这些副作用会原样拷贝到子进程中,然后就发生错误,如:

could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED

解决的方法是,保证父进程不引入这些工具,而是在子进程创建好了以后,让子进程各自引入。

如果使用 Process,那就在 target 函数里面 import。举个例子:

import multiprocessingdef hello(taskq, resultq):    import tensorflow as tf    config = tf.ConfigProto()    config.gpu_options.allow_growth=True    sess = tf.Session(config=config)    while True:        name = taskq.get()        res = sess.run(tf.constant('hello ' + name))        resultq.put(res)if __name__ == '__main__':    taskq = multiprocessing.Queue()    resultq = multiprocessing.Queue()    p = multiprocessing.Process(target=hello, args=(taskq, resultq))    p.start()    taskq.put('world')    taskq.put('abcdabcd987')    taskq.close()    print(resultq.get())    print(resultq.get())    p.terminate()    p.join()

如果使用 Pool,那么可以编写一个函数,在这个函数里面 import,并且把这个函数作为 initializer传入到 Pool 的构造函数里面。举个例子:

import multiprocessingdef init():    global tf    global sess    import tensorflow as tf    config = tf.ConfigProto()    config.gpu_options.allow_growth=True    sess = tf.Session(config=config)def hello(name):    return sess.run(tf.constant('hello ' + name))if __name__ == '__main__':    pool = multiprocessing.Pool(processes=2, initializer=init)    xs = ['world', 'abcdabcd987', 'Lequn Chen']    print pool.map(hello, xs)