python 2.6 multiprocessing 的使用

时间:2022-07-24 18:16:49

原文出处:http://www.rainsts.net/article.asp?id=1016

 

Python Standard Library 从 2.6 起增加了子进程级别的并行开发支持 —— multiprocessing。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import os, time
from multiprocessing import *

def test(x):
    print current_process().pid, x
    time.sleep(1)

if __name__ == "__main__":
    print "main:", os.getpid()
    p = Pool(5)
    p.map(test, range(13))


输出:

$ ./test.py

main: 1208
1209 0
1210 1
1211 2
1212 3
1213 4
1210 5
1209 6
1212 7
1211 8
1213 9
1209 10
1210 11
1212 12


从输出结果,我们可以看出多个子进程 "并行" 完成了 "计算" 任务。

1. Process

我们先从最根本的 Process 入手,看看是如何启动子进程完成并行计算的。上面的 Pool 不过是创建多个 Process,然后将数据(args)提交给多个子进程完成而已。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import os, time
from multiprocessing import *

def test(x):
    print current_process().pid, x
    time.sleep(1)

if __name__ == "__main__":
    print "main:", os.getpid()

    p = Process(target = test, args = [100])
    p.start()
    p.join()


输出:

$ ./test.py

main: 1229
1230 100


为了更好掌握这个 "并行" 利器,需要 "深入" 探究子进程的执行流程和生命周期。multiprocessing 在接口设计上参考了 threading,而我们分析的起点就是 Process.start()。

class Process(object):
    '''
    Process objects represent activity that is run in a separate process

    The class is analagous to `threading.Thread`
    '''

    _Popen = None

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        ...
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        ...

    def start(self):
        ...

        if self._Popen is not None:
            Popen = self._Popen
        else:
            from .forking import Popen

        self._popen = Popen(self)
        _current_process._children.add(self)


继续,这个 Popen 是关键,打开 forking.py。

class Popen(object):

    def __init__(self, process_obj):
        ...
        self.pid = os.fork()
        if self.pid == 0:
            ...
            code = process_obj._bootstrap()
            sys.stdout.flush()
            sys.stderr.flush()
            os._exit(code)


这里调用了关键的 fork(),子进程由此而生。接下来子进程调用了 Process._bootstrap()。注意,这里还调用了 "os._exit(code)",这表明子进程与 Process.Start() "self._popen = Popen(self)" 之后的代码分道扬镳。

class Process(object):

    def _bootstrap(self):
        from . import util
        global _current_process

        try:
            self._children = set()
            self._counter = itertools.count(1)

            try:
                sys.stdin.close()
                sys.stdin = open(os.devnull)
            except (OSError, ValueError):
                pass

            _current_process = self # 重置了 _current_process !
            util._finalizer_registry.clear()
            util._run_after_forkers()
            util.info('child process calling self.run()')

            try:
                self.run()
                exitcode = 0
            finally:
                util._exit_function()

        except SystemExit, e:
            if not e.args:
                exitcode = 1
            elif type(e.args[0]) is int:
                exitcode = e.args[0]
            else:
                sys.stderr.write(e.args[0] + '/n')
                sys.stderr.flush()
                exitcode = 1
        except:
            exitcode = 1
            import traceback
            sys.stderr.write('Process %s:/n' % self.name)
            sys.stderr.flush()
            traceback.print_exc()

        util.info('process exiting with exitcode %d' % exitcode)
        return exitcode


这个方法看着复杂,初始化子进程环境,并对异常等做出处理,但关键的就是 "self.run()" 这一行。

class Process(object):
    def run(self):
        if self._target:
            self._target(*self._args, **self._kwargs)


子进程在这里调用了我们创建 Process 对象时传递的 target 参数,也就是上例中的 test()。如此,我们就大概明白了子进程创建和调用的整个过程,似乎并不复杂。不过事情还没完,我们需要继续 "了解" 父进程在创建子进程之后做了什么。

回到 Process.start(),Popen 创建并调用子进程,而父进程则继续执行下一步。

class Process(object):

    def start(self):
        ...

        if self._Popen is not None:
            Popen = self._Popen
        else:
            from .forking import Popen

        self._popen = Popen(self)
        _current_process._children.add(self)


这个 _current_process 默认应该代表父进程,只是从何而来?在 process.py 最后面有这些代码。

class _MainProcess(Process):

    def __init__(self):
        self._identity = ()
        self._daemonic = False
        self._name = 'MainProcess'
        self._parent_pid = None
        self._popen = None
        self._counter = itertools.count(1)
        self._children = set()
        self._authkey = AuthenticationString(os.urandom(32))
        self._tempdir = None

_current_process = _MainProcess()
del _MainProcess


只是对当前进程(父进程)的简单包装。父进程通过 Process.start() 创建子进程,并添加到一个子进程列表(_children)中。那么父进程如何处理子进程的退出呢?要知道不小心会搞出很多僵尸进程(参见 《fork: 杀僵尸练级》)。

在 multiprocessing.__init__.py 中引入了 util。

from multiprocessing.util import SUBDEBUG, SUBWARNING


在 util.py 尾部有这些代码。

from multiprocessing.process import current_process, active_children

def _exit_function():
    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')
    _run_finalizers(0)

    for p in active_children():
        if p._daemonic:
            info('calling terminate() for daemon %s', p.name)
            p._popen.terminate()

    for p in active_children():
        info('calling join() for process %s', p.name)
        p.join()

    debug('running the remaining "atexit" finalizers')
    _run_finalizers()

atexit.register(_exit_function)


active_children 由 process 导入。

def current_process():
    '''Return process object representing the current process'''
    return _current_process

def active_children():
    '''Return list of process objects corresponding to live child processes'''
    _cleanup()
    return list(_current_process._children)


_exit_function 会在父进程(或子进程,子进程可以创建自己的子进程)退出时检查其子进程列表,并调用 join 等待子进程退出。如果子进程是 daemon,则直接调用 terminate (发送 SIGTERM 信号) 终止。

class Process(object):

    def join(self, timeout=None):
        ...
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

class Popen(object):

    def wait(self, timeout=None):
        if timeout is None:
            return self.poll(0)
        ...
        while 1:
            res = self.poll()
            ...
        return res

    def poll(self, flag=os.WNOHANG):
        if self.returncode is None:
            pid, sts = os.waitpid(self.pid, flag)
            if pid == self.pid:
                if os.WIFSIGNALED(sts):
                    self.returncode = -os.WTERMSIG(sts)
            else:
                assert os.WIFEXITED(sts)
                self.returncode = os.WEXITSTATUS(sts)
        return self.returncode

    def terminate(self):
        if self.returncode is None:
        try:
            os.kill(self.pid, signal.SIGTERM)
        except OSError, e:
            if self.wait(timeout=0.1) is None:
            raise


Process.join() 调用 Popen.wait(),而 wait() 内部则通过 poll() 调用 os.waitpid() 等待子进程终止,如此这般才算完成了一个完整的流程。

附注: 子进程同样也会在其退出时调用 uitl._exit_function。

------------ 分隔线 -----------------

本文主要分析 multiprocessing 的相关原理,有关使用细节可参考标准库帮助文件。