谁在使用多处理池的apply_async方法时运行回调?

时间:2022-10-06 21:02:46

I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.

我正在尝试了解使用多处理池的apply_sync方法时幕后发生的一些事情。

Who runs the callback method? Is it the main process that called apply_async?

谁运行回调方法?它是调用apply_async的主要进程吗?

Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?

假设我发送了一大堆带回调的apply_async命令,然后继续我的程序。当apply_async开始完成时,我的程序仍在执行操作。当主进程仍然忙于脚本时,回调是如何运行我的“主进程”的?

Here's an example.

这是一个例子。

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

The output is something like

输出就像

PoolWorker-1 running func with arg 0

使用arg 0运行func的PoolWorker-1

PoolWorker-2 running func with arg 1

使用arg 1运行func的PoolWorker-2

PoolWorker-3 running func with arg 2

PoolWorker-3使用arg 2运行func

MainProcess going to sleep for a minute <-- main process is busy

MainProcess进入休眠状态< - 主进程正忙

PoolWorker-4 running func with arg 3

PoolWorker-4使用arg 3运行func

PoolWorker-1 running func with arg 4

使用arg 4运行func的PoolWorker-1

PoolWorker-2 running func with arg 5

使用arg 5运行func的PoolWorker-2

PoolWorker-3 running func with arg 6

PoolWorker-3使用arg 6运行func

PoolWorker-4 running func with arg 7

使用arg 7运行func的PoolWorker-4

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess运行带有arg 0的回调< - 主进程在仍处于while循环时运行回调!!

MainProcess running callback with arg 1

MainProcess运行带有arg 1的回调

MainProcess running callback with arg 2

MainProcess使用arg 2运行回调

MainProcess running callback with arg 3

MainProcess使用arg 3运行回调

MainProcess running callback with arg 4

MainProcess使用arg 4运行回调

PoolWorker-1 running func with arg 8

使用arg 8运行func的PoolWorker-1

...

Finished with script

用脚本完成

How is MainProcess running the callback while it's in the middle of that while loop??

MainProcess如何在while循环中运行回调?

There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.

有关于multiprocessing.Pool文档中的回调的说法似乎是一个提示,但我不明白。

apply_async(func[, args[, kwds[, callback]]])

apply_async(func [,args [,kwds [,callback]]])

A variant of the apply() method which returns a result object.

apply()方法的一种变体,它返回一个结果对象。

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

如果指定了回调,那么它应该是一个可调用的,它接受一个参数。当结果变为就绪时,将对其应用回调(除非呼叫失败)。回调应该立即完成,否则处理结果的线程将被阻止。

1 个解决方案

#1


28  

There is indeed a hint in the docs:

文档中确实有一个暗示:

callback should complete immediately since otherwise the thread which handles the results will get blocked.

回调应该立即完成,否则处理结果的线程将被阻止。

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

回调在主进程中处理,但它们在自己的单独线程中运行。创建池时,它实际上在内部创建了一些Thread对象:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

The interesting thread for us is _result_handler; we'll get to why shortly.

对我们来说有趣的线索是_result_handler;我们很快就会明白为什么。

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

切换齿轮一秒钟,当你运行apply_async时,它会在内部创建一个ApplyResult对象来管理从子进程获取结果:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

如您所见,假设任务成功,_set方法最终会实际执行传入的回调。另请注意,它会在__init__末尾将其自身添加到全局缓存字典中。

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

现在,回到_result_handler线程对象。该对象调用_handle_results函数,如下所示:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

这是一个循环,它只是从子队列中取出结果,在缓存中找到它的条目,并调用_set,它执行我们的回调。即使你处于循环中它也可以运行,因为它没有在主线程中运行。

#1


28  

There is indeed a hint in the docs:

文档中确实有一个暗示:

callback should complete immediately since otherwise the thread which handles the results will get blocked.

回调应该立即完成,否则处理结果的线程将被阻止。

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

回调在主进程中处理,但它们在自己的单独线程中运行。创建池时,它实际上在内部创建了一些Thread对象:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

The interesting thread for us is _result_handler; we'll get to why shortly.

对我们来说有趣的线索是_result_handler;我们很快就会明白为什么。

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

切换齿轮一秒钟,当你运行apply_async时,它会在内部创建一个ApplyResult对象来管理从子进程获取结果:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

如您所见,假设任务成功,_set方法最终会实际执行传入的回调。另请注意,它会在__init__末尾将其自身添加到全局缓存字典中。

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

现在,回到_result_handler线程对象。该对象调用_handle_results函数,如下所示:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

这是一个循环,它只是从子队列中取出结果,在缓存中找到它的条目,并调用_set,它执行我们的回调。即使你处于循环中它也可以运行,因为它没有在主线程中运行。