多处理。池:使用apply_async的回调选项时调用helper函数。

时间:2022-06-01 21:03:33

How does the flow of apply_async work between calling the iterable (?) function and the callback function?

apply_async在调用iterable(?)函数和回调函数之间是如何工作的?

Setup: I am reading some lines of all the files inside a 2000 file directory, some with millions of lines, some with only a few. Some header/formatting/date data is extracted to charecterize each file. This is done on a 16 CPU machine, so it made sense to multiprocess it.

设置:我正在读取2000个文件目录中所有文件的一些行,其中一些包含数百万行,有些行只有少数行。将一些头/格式化/日期数据提取到charecterize每个文件。这是在一个16 CPU的机器上完成的,所以多处理它是有意义的。

Currently, the expected result is being sent to a list (ahlala) so I can print it out; later, this will be written to *.csv. This is a simplified version of my code, originally based off this extremely helpful post.

目前,预期的结果被发送到一个列表(ahlala),以便我可以打印出来;稍后,将写入*.csv。这是我的代码的简化版本,最初是基于这个非常有用的帖子。

import multiprocessing as mp

def dirwalker(directory):
  ahlala = []

  # X() reads files and grabs lines, calls helper function to calculate
  # info, and returns stuff to the callback function
  def X(f): 
    fileinfo = Z(arr_of_lines) 
    return fileinfo 

  # Y() reads other types of files and does the same thing
  def Y(f): 
    fileinfo = Z(arr_of_lines)
    return fileinfo

  # results() is the callback function
  def results(r):
    ahlala.extend(r) # or .append, haven't yet decided

  # helper function
  def Z(arr):
    return fileinfo # to X() or Y()!

  for _,_,files in os.walk(directory):
    pool = mp.Pool(mp.cpu_count()
    for f in files:
      if (filetype(f) == filetypeX): 
        pool.apply_async(X, args=(f,), callback=results)
      elif (filetype(f) == filetypeY): 
        pool.apply_async(Y, args=(f,), callback=results)

  pool.close(); pool.join()
  return ahlala

Note, the code works if I put all of Z(), the helper function, into either X(), Y(), or results(), but is this either repetitive or possibly slower than possible? I know that the callback function is called for every function call, but when is the callback function called? Is it after pool.apply_async()...finishes all the jobs for the processes? Shouldn't it be faster if these helper functions were called within the scope (?) of the first function pool.apply_async() takes (in this case, X())? If not, should I just put the helper function in results()?

注意,如果我将所有的Z()、helper函数放入X()、Y()或结果()中,那么代码会工作,但是这不是重复的,就是可能比可能的慢?我知道回调函数调用每个函数调用,但是何时调用回调函数?pool.apply_async后()……完成流程的所有作业?如果这些helper函数在第一个函数pool.apply_async()的范围内调用(在本例中是X()),是不是应该更快一些呢?如果没有,我是否应该将helper函数放在results()中?

Other related ideas: Are daemon processes why nothing shows up? I am also very confused about how to queue things, and if this is the problem. This seems like a place to start learning it, but can queuing be safely ignored when using apply_async, or only at a noticable time inefficiency?

其他相关的想法:守护进程为什么没有出现?我也很困惑如何排队,如果这是问题。这似乎是一个开始学习它的地方,但是在使用apply_async时,或者仅仅是在一个明显的时间效率低下的情况下,排队可以被安全地忽略吗?

1 个解决方案

#1


5  

You're asking about a whole bunch of different things here, so I'll try to cover it all as best I can:

你问的是一大堆不同的东西,所以我会尽量把它讲完:

The function you pass to callback will be executed in the main process (not the worker) as soon as the worker process returns its result. It is executed in a thread that the Pool object creates internally. That thread consumes objects from a result_queue, which is used to get the results from all the worker processes. After the thread pulls the result off the queue, it executes the callback. While your callback is executing, no other results can be pulled from the queue, so its important that the callback finishes quickly. With your example, as soon as one of the calls to X or Y you make via apply_async completes, the result will be placed into the result_queue by the worker process, and then the result-handling thread will pull the result off of the result_queue, and your callback will be executed.

当工作进程返回结果时,您传递给回调的函数将在主进程(而不是worker)中执行。它在池对象内部创建的线程中执行。该线程从result_queue中使用对象,该队列用于从所有的工作进程中获取结果。在线程将结果从队列中取出后,它执行回调。当您的回调执行时,没有其他的结果可以从队列中提取出来,因此回调完成的速度非常重要。以您的示例为例,当您通过apply_async完成对X或Y的调用时,结果将由worker进程放置到result_queue中,结果处理线程将从result_queue中提取结果,您的回调将被执行。

Second, I suspect the reason you're not seeing anything happen with your example code is because all of your worker function calls are failing. If a worker function fails, callback will never be executed. The failure won't be reported at all unless you try to fetch the result from the AsyncResult object returned by the call to apply_async. However, since you're not saving any of those objects, you'll never know the failures occurred. If I were you, I'd try using pool.apply while you're testing so that you see errors as soon as they occur.

其次,我怀疑您没有看到示例代码发生任何事情的原因是因为您的所有工作函数调用都失败了。如果一个worker函数失败,回调将永远不会被执行。除非您尝试从调用apply_async返回的AsyncResult对象中获取结果,否则将不会报告失败。但是,由于您没有保存这些对象,所以您永远不会知道发生了什么故障。如果我是你,我会尝试使用游泳池。在测试时应用程序,以便在发生错误时立即看到错误。

The reason the workers are probably failing (at least in the example code you provided) is because X and Y are defined as function inside another function. multiprocessing passes functions and objects to worker processes by pickling them in the main process, and unpickling them in the worker processes. Functions defined inside other functions are not picklable, which means multiprocessing won't be able to successfully unpickle them in the worker process. To fix this, define both functions at the top-level of your module, rather than embedded insice the dirwalker function.

工作人员可能失败的原因(至少在您提供的示例代码中)是由于X和Y被定义为另一个函数中的函数。多处理将函数和对象通过在主进程中pickle它们来传递函数和对象,并在工作进程中对它们进行清除。在其他函数中定义的函数不是picklable,这意味着多处理不能在工作进程中成功地将其解pickle。要解决这个问题,请在模块的顶层定义两个函数,而不是嵌入到dirwalker函数中。

You should definitely continue to call Z from X and Y, not in results. That way, Z can be run concurrently across all your worker processes, rather than having to be run one call at a time in your main process. And remember, your callback function is supposed to be as quick as possible, so you don't hold up processing results. Executing Z in there would slow things down.

你应该继续从X和Y中调用Z,而不是结果。这样,Z就可以跨所有的工作进程并发运行,而不必在主进程中一次运行一个调用。记住,你的回调函数应该是尽可能快的,这样你就不会耽误处理结果。在那里执行Z会减慢速度。

Here's some simple example code that's similar to what you're doing, that hopefully gives you an idea of what your code should look like:

下面是一些简单的示例代码,它们类似于您正在做的工作,希望能让您了解代码应该是什么样子:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

Output:

输出:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]

Edit:

编辑:

You can create a dict object that's shared between your parent and child processes using the multiprocessing.Manager class:

您可以创建一个命令对象,在父进程和子进程之间使用multiprocessing共享。经理类:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

Then make X and Y take a second argument called helper_dict (or whatever name you want), and you're all set.

然后让X和Y进行第二个参数,称为helper_dict(或者任何你想要的名称),然后你就都设置好了。

The caveat is that this worked by creating a server process that contains a normal dict, and all your other processes talk to that one dict via a Proxy object. So every time you read or write to the dict, you're doing IPC. This makes it a lot slower than a real dict.

需要注意的是,这是通过创建一个包含普通命令的服务器进程来实现的,而所有其他进程都通过代理对象与该命令进行对话。所以每次你读或写命令时,你都在做IPC。这比真正的命令要慢得多。

#1


5  

You're asking about a whole bunch of different things here, so I'll try to cover it all as best I can:

你问的是一大堆不同的东西,所以我会尽量把它讲完:

The function you pass to callback will be executed in the main process (not the worker) as soon as the worker process returns its result. It is executed in a thread that the Pool object creates internally. That thread consumes objects from a result_queue, which is used to get the results from all the worker processes. After the thread pulls the result off the queue, it executes the callback. While your callback is executing, no other results can be pulled from the queue, so its important that the callback finishes quickly. With your example, as soon as one of the calls to X or Y you make via apply_async completes, the result will be placed into the result_queue by the worker process, and then the result-handling thread will pull the result off of the result_queue, and your callback will be executed.

当工作进程返回结果时,您传递给回调的函数将在主进程(而不是worker)中执行。它在池对象内部创建的线程中执行。该线程从result_queue中使用对象,该队列用于从所有的工作进程中获取结果。在线程将结果从队列中取出后,它执行回调。当您的回调执行时,没有其他的结果可以从队列中提取出来,因此回调完成的速度非常重要。以您的示例为例,当您通过apply_async完成对X或Y的调用时,结果将由worker进程放置到result_queue中,结果处理线程将从result_queue中提取结果,您的回调将被执行。

Second, I suspect the reason you're not seeing anything happen with your example code is because all of your worker function calls are failing. If a worker function fails, callback will never be executed. The failure won't be reported at all unless you try to fetch the result from the AsyncResult object returned by the call to apply_async. However, since you're not saving any of those objects, you'll never know the failures occurred. If I were you, I'd try using pool.apply while you're testing so that you see errors as soon as they occur.

其次,我怀疑您没有看到示例代码发生任何事情的原因是因为您的所有工作函数调用都失败了。如果一个worker函数失败,回调将永远不会被执行。除非您尝试从调用apply_async返回的AsyncResult对象中获取结果,否则将不会报告失败。但是,由于您没有保存这些对象,所以您永远不会知道发生了什么故障。如果我是你,我会尝试使用游泳池。在测试时应用程序,以便在发生错误时立即看到错误。

The reason the workers are probably failing (at least in the example code you provided) is because X and Y are defined as function inside another function. multiprocessing passes functions and objects to worker processes by pickling them in the main process, and unpickling them in the worker processes. Functions defined inside other functions are not picklable, which means multiprocessing won't be able to successfully unpickle them in the worker process. To fix this, define both functions at the top-level of your module, rather than embedded insice the dirwalker function.

工作人员可能失败的原因(至少在您提供的示例代码中)是由于X和Y被定义为另一个函数中的函数。多处理将函数和对象通过在主进程中pickle它们来传递函数和对象,并在工作进程中对它们进行清除。在其他函数中定义的函数不是picklable,这意味着多处理不能在工作进程中成功地将其解pickle。要解决这个问题,请在模块的顶层定义两个函数,而不是嵌入到dirwalker函数中。

You should definitely continue to call Z from X and Y, not in results. That way, Z can be run concurrently across all your worker processes, rather than having to be run one call at a time in your main process. And remember, your callback function is supposed to be as quick as possible, so you don't hold up processing results. Executing Z in there would slow things down.

你应该继续从X和Y中调用Z,而不是结果。这样,Z就可以跨所有的工作进程并发运行,而不必在主进程中一次运行一个调用。记住,你的回调函数应该是尽可能快的,这样你就不会耽误处理结果。在那里执行Z会减慢速度。

Here's some simple example code that's similar to what you're doing, that hopefully gives you an idea of what your code should look like:

下面是一些简单的示例代码,它们类似于您正在做的工作,希望能让您了解代码应该是什么样子:

import multiprocessing as mp
import os

# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f): 
    fileinfo = Z(f) 
    return fileinfo 

# Y() reads other types of files and does the same thing
def Y(f): 
    fileinfo = Z(f)
    return fileinfo

# helper function
def Z(arr):
    return arr + "zzz"

def dirwalker(directory):
    ahlala = []

    # results() is the callback function
    def results(r):
        ahlala.append(r) # or .append, haven't yet decided

    for _,_,files in os.walk(directory):
        pool = mp.Pool(mp.cpu_count())
        for f in files:
            if len(f) > 5: # Just an arbitrary thing to split up the list with
                pool.apply_async(X, args=(f,), callback=results)  # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :(
            else:
                pool.apply_async(Y, args=(f,), callback=results)

    pool.close()
    pool.join()
    return ahlala


if __name__ == "__main__":
    print(dirwalker("/usr/bin"))

Output:

输出:

['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ]

Edit:

编辑:

You can create a dict object that's shared between your parent and child processes using the multiprocessing.Manager class:

您可以创建一个命令对象,在父进程和子进程之间使用multiprocessing共享。经理类:

pool = mp.Pool(mp.cpu_count())
m = multiprocessing.Manager()
helper_dict = m.dict()
for f in files:
    if len(f) > 5:
        pool.apply_async(X, args=(f, helper_dict), callback=results)
    else:
        pool.apply_async(Y, args=(f, helper_dict), callback=results)

Then make X and Y take a second argument called helper_dict (or whatever name you want), and you're all set.

然后让X和Y进行第二个参数,称为helper_dict(或者任何你想要的名称),然后你就都设置好了。

The caveat is that this worked by creating a server process that contains a normal dict, and all your other processes talk to that one dict via a Proxy object. So every time you read or write to the dict, you're doing IPC. This makes it a lot slower than a real dict.

需要注意的是,这是通过创建一个包含普通命令的服务器进程来实现的,而所有其他进程都通过代理对象与该命令进行对话。所以每次你读或写命令时,你都在做IPC。这比真正的命令要慢得多。