Python 多进程编程

时间:2022-02-18 11:30:36

Python 多进程编程

Python的解释器默认是线程安全的,其主要措施是一个叫做GIL(Global Interpreter Lock)的机制。

GIL机制来保证对Python对象的引用计数的操作都是原子操作,从而不会因为多线程造成引用对象的内存泄漏和错误引用。但是这个机制也就限制了多线程的性能。因为GIL的存在,Python一次只能有一个线程在运行。

所以多线程不能提高程序性能,除非你有大量的I/O操作(例如:读写文件,网络通信等)。

另外一种提高性能,利用多CPU的方案就是多进程。

这里介绍如何使用Python库multiprocessing,生成多进程来利用多核CPU。

多进程库 multiprocessing

multiprocessing 是Python的标准库之一,官方文档参考multiprocessing — Process-based parallelism.

我们主要使用它的2个类来产生子进程:

  • Process 进程类
  • Pool 进程池类

进程类 Process

Process类可以单独调用,也可以继承使用。主要操作如下:

  • Process.start() 启动子进程
  • Process.join() 等待子进程结束(阻塞等待)
  • Process.is_alive() 判断子进程是否在运行(运行中返回True)
  • Process.close() 结束子进程(不推荐使用,会抛出异常)

这里以调用为例:

#!/usr/bin/python

from __future__ import print_function  # at top of module
from __future__ import division, unicode_literals, with_statement

import time
import os
from multiprocessing import Process


def task_func(timeout):
    name = 'task[%d]' % os.getpid()
    print('%s start' % name)
    for i in range(timeout):
        time.sleep(1)
        print('%s waiting(%d/%d)' % (name, i, timeout))
    print('%s end' % name)


def main():
    print('main start')

    # 创建子进程, 来运行task_func
    p1 = Process(target=task_func, args=(5,))
    p1.start()

    # 父进程可以进行其他处理
    for i in range(3):
        print('do something %d' % i)
        time.sleep(1)

    # 等待子进程结束
    p1.join()

    print('main end')


if __name__ == '__main__':
    main()

输出如下

>  ./mutliproc.py
main start
do something 0
task[30280] start
do something 1
task[30280] waiting(0/5)
do something 2
task[30280] waiting(1/5)
task[30280] waiting(2/5)
task[30280] waiting(3/5)
task[30280] waiting(4/5)
task[30280] end
main end

这里还可以利用队列Queue或者管道Pipe来进行父进程和子进程之间的数据交换。

下面义队列Queue为例,进程单向数据传输:

#!/usr/bin/python

from __future__ import print_function  # at top of module
from __future__ import division, unicode_literals, with_statement

import time
import os
from multiprocessing import Queue, Process


def task_func(queue, timeout):
    name = 'task[%d]' % os.getpid()
    print('%s start' % name)
    for i in range(timeout):
        time.sleep(1)
        # 数据发送到队列, 不仅可以发送字符串,还可以发送对象
        data = {
            'task': name,
            'progress': i,
            'timeout': timeout,
        }
        queue.put(data)
    print('%s end' % name)


def main():
    print('main start')

    # 父进程创建队列
    q = Queue()

    # 创建子进程, 队列已参数的形式传给子进程
    p1 = Process(target=task_func, args=(q, 5))
    p1.start()

    # 父进程可以进行其他处理
    for i in range(3):
        print('do something %d' % i)
        time.sleep(1)

    # 等待子进程结束
    p1.join()

    # 读取所有数据
    while not q.empty():
        print(q.get())

    print('main end')


if __name__ == '__main__':
    main()

需要数据双向交换的,可以用Pipe类。由于本人用不到,这里贴出官方的例子:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

进程池类 Pool

使用进程池的好处就是可以,创建有限的进程来自动并行处理大量的任务。

Pool类的主要方法:

  • Pool.apply() 启动任务(阻塞等待任务完成)
  • Pool.map() 批量启动任务(阻塞等待所有任务完成)
  • Pool.apply_async() 启动任务(非阻塞)
  • Pool.map_async() 批量启动任务(非阻塞)

这里以最常用的非阻塞为例

#!/usr/bin/python

from __future__ import print_function  # at top of module
from __future__ import division, unicode_literals, with_statement

import time
import os
from multiprocessing import Pool


def task_func(timeout):
    name = 'task[%d]' % os.getpid()
    print('%s start' % name)
    for i in range(timeout):
        time.sleep(1)
        print('%s waiting(%d/%d)' % (name, i, timeout))
    print('%s end' % name)
    # 通过return返回数据 
    return {'task': name, 'timeout': timeout}


def main():
    print('main start')

    # 创建进程池, 允许同时运行4个进程
    pool = Pool(4)

    # 创建6个任务, 并全部非阻塞启动(实际只有4个开始运行, 另外2个在等待)
    ret = [pool.apply_async(task_func, args=(i,)) for i in range(6)]

    # 读取每个任务的返回结果
    for each in ret:
        print(each.get())

    print('main end')


if __name__ == '__main__':
    main()

注意:
1. Queue不能通过参数的方式,传给由进程池启动的子进程
2. 由进程池启动的子进程可以通过return来返回结果数据

总结

  1. 对于不需要进行数据交换,只看结果的任务,推荐使用进程池
  2. 尽量将共享资源通过参数的形式传入子进程(保持Linux和Windows行为的一致)