如何将multiprocessing.Pool实例传递给apply_async回调函数?

时间:2022-06-01 21:03:15

Here is my prime factorization program,i added a callback function in pool.apply_async(findK, args=(N,begin,end)),a message prompt out prime factorization is over when factorization is over,it works fine.

这是我的主要分解程序,我在pool.apply_async中添加了一个回调函数(findK,args =(N,begin,end)),当分解结束时,一个消息提示出素数因子分解结束,它工作正常。

import math
import multiprocessing 

def findK(N,begin,end):
    for k in range(begin,end):
        if N% k == 0:
            print(N,"=" ,k ,"*", N/k)
            return True
    return False


def prompt(result):
    if result:
        print("prime factorization is over")


def mainFun(N,process_num):
    pool = multiprocessing.Pool(process_num)
    for i in range(process_num):
        if i ==0 :
            begin =2
        else:
            begin = int(math.sqrt(N)/process_num*i)+1
        end = int(math.sqrt(N)/process_num*(i+1))
        pool.apply_async(findK, args=(N,begin,end) , callback = prompt)    
    pool.close()
    pool.join()    

if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N,process_num)

Now i want to change the callback function in apply_async,to change prompt into a shutdown function to kill all other process.

现在我想更改apply_async中的回调函数,将提示更改为关闭函数以终止所有其他进程。

def prompt(result):
    if result:
        pool.terminate()

The pool instance is not defined in prompt scope or passed into prompt.
pool.terminate() can't work in prompt function.
How to pass multiprocessing.Pool instance to apply_async'callback function ?
(I have made it done in class format,just to add a class method and call self.pool.terminate can kill all other process, how to do the job in function format?)

池实例未在提示范围中定义或传递到提示中。 pool.terminate()无法在提示函数中工作。如何将multiprocessing.Pool实例传递给apply_async'callback函数? (我已经用类格式完成了,只是添加一个类方法并调用self.pool.terminate可以杀死所有其他进程,如何以函数格式完成工作?)

if not set pool as global variable, can pool be passed into callback function?

如果没有将pool设置为全局变量,可以将池传递给回调函数吗?

3 个解决方案

#1


8  

Passing extra arguments to the callback function is not supported. Yet you have plenty of elegant ways to workaround that.

不支持将额外参数传递给回调函数。然而,你有很多优雅的方法来解决这个问题。

You can encapsulate your pool logic into an object:

您可以将池逻辑封装到对象中:

class Executor:
    def __init__(self, process_num):
        self.pool = multiprocessing.Pool(process_num)

    def prompt(self, result):
        if result:
            print("prime factorization is over")
            self.pool.terminate()

    def schedule(self, function, args):
        self.pool.apply_async(function, args=args, callback=self.prompt)

    def wait(self):
        self.pool.close()
        self.pool.join() 


def main(N,process_num):
    executor = Executor(process_num)
    for i in range(process_num):
        ...
        executor.schedule(findK, (N,begin,end))   
    executor.wait()

Or you can use the concurrent.futures.Executor implementation which returns a Future object. You just append the pool to the Future object before setting the callback.

或者您可以使用concurrent.futures.Executor实现,该实现返回Future对象。您只需在设置回调之前将池附加到Future对象。

def prompt(future):
    if future.result():
        print("prime factorization is over")
        future.pool_executor.shutdown(wait=False)

def main(N,process_num):
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num)
    for i in range(process_num):
        ...
        future = executor.submit(findK, N,begin,end)
        future.pool_executor = executor
        future.add_done_callback(prompt)

#2


4  

You need to have pool end up in prompt's environment. One possibility is to move pool into the global scope (though this isn't really best-practice). This appears to work:

您需要在提示环境中使用池。一种可能性是将池转移到全球范围(尽管这不是最佳实践)。这似乎有效:

import math
import multiprocessing 

pool = None

def findK(N,begin,end):
    for k in range(begin,end):
        if N% k == 0:
            print(N,"=" ,k ,"*", N/k)
            return True
    return False


def prompt(result):
    if result:
        print("prime factorization is over")
        pool.terminate()


def mainFun(N,process_num):
    global pool
    pool = multiprocessing.Pool(process_num)
    for i in range(process_num):
        if i ==0 :
            begin =2
        else:
            begin = int(math.sqrt(N)/process_num*i)+1
        end = int(math.sqrt(N)/process_num*(i+1))
        pool.apply_async(findK, args=(N,begin,end) , callback = prompt)    
    pool.close()
    pool.join()    

if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N,process_num)

#3


4  

You can simply define a local close function as a callback:

您可以简单地将本地关闭函数定义为回调:

import math
import multiprocessing 


def findK(N, begin, end):
    for k in range(begin, end):
        if N % k == 0:
            print(N, "=", k, "*", N / k)
            return True
    return False


def mainFun(N, process_num):
    pool = multiprocessing.Pool(process_num)

    def close(result):
        if result:
            print("prime factorization is over")
            pool.terminate()
    for i in range(process_num):
        if i == 0:
            begin = 2
        else:
            begin = int(math.sqrt(N) / process_num * i) + 1
        end = int(math.sqrt(N) / process_num * (i + 1))
        pool.apply_async(findK, args=(N, begin, end), callback=close)
    pool.close()
    pool.join()


if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N, process_num)

You can also use a partial function from functool, with

你也可以使用functool的部分功能

import functools

def close_pool(pool, results):
    if result:
        pool.terminate()

def mainFun(N, process_num):
    pool = multiprocessing.Pool(process_num)

    close = funtools.partial(close_pool, pool)
....

#1


8  

Passing extra arguments to the callback function is not supported. Yet you have plenty of elegant ways to workaround that.

不支持将额外参数传递给回调函数。然而,你有很多优雅的方法来解决这个问题。

You can encapsulate your pool logic into an object:

您可以将池逻辑封装到对象中:

class Executor:
    def __init__(self, process_num):
        self.pool = multiprocessing.Pool(process_num)

    def prompt(self, result):
        if result:
            print("prime factorization is over")
            self.pool.terminate()

    def schedule(self, function, args):
        self.pool.apply_async(function, args=args, callback=self.prompt)

    def wait(self):
        self.pool.close()
        self.pool.join() 


def main(N,process_num):
    executor = Executor(process_num)
    for i in range(process_num):
        ...
        executor.schedule(findK, (N,begin,end))   
    executor.wait()

Or you can use the concurrent.futures.Executor implementation which returns a Future object. You just append the pool to the Future object before setting the callback.

或者您可以使用concurrent.futures.Executor实现,该实现返回Future对象。您只需在设置回调之前将池附加到Future对象。

def prompt(future):
    if future.result():
        print("prime factorization is over")
        future.pool_executor.shutdown(wait=False)

def main(N,process_num):
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num)
    for i in range(process_num):
        ...
        future = executor.submit(findK, N,begin,end)
        future.pool_executor = executor
        future.add_done_callback(prompt)

#2


4  

You need to have pool end up in prompt's environment. One possibility is to move pool into the global scope (though this isn't really best-practice). This appears to work:

您需要在提示环境中使用池。一种可能性是将池转移到全球范围(尽管这不是最佳实践)。这似乎有效:

import math
import multiprocessing 

pool = None

def findK(N,begin,end):
    for k in range(begin,end):
        if N% k == 0:
            print(N,"=" ,k ,"*", N/k)
            return True
    return False


def prompt(result):
    if result:
        print("prime factorization is over")
        pool.terminate()


def mainFun(N,process_num):
    global pool
    pool = multiprocessing.Pool(process_num)
    for i in range(process_num):
        if i ==0 :
            begin =2
        else:
            begin = int(math.sqrt(N)/process_num*i)+1
        end = int(math.sqrt(N)/process_num*(i+1))
        pool.apply_async(findK, args=(N,begin,end) , callback = prompt)    
    pool.close()
    pool.join()    

if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N,process_num)

#3


4  

You can simply define a local close function as a callback:

您可以简单地将本地关闭函数定义为回调:

import math
import multiprocessing 


def findK(N, begin, end):
    for k in range(begin, end):
        if N % k == 0:
            print(N, "=", k, "*", N / k)
            return True
    return False


def mainFun(N, process_num):
    pool = multiprocessing.Pool(process_num)

    def close(result):
        if result:
            print("prime factorization is over")
            pool.terminate()
    for i in range(process_num):
        if i == 0:
            begin = 2
        else:
            begin = int(math.sqrt(N) / process_num * i) + 1
        end = int(math.sqrt(N) / process_num * (i + 1))
        pool.apply_async(findK, args=(N, begin, end), callback=close)
    pool.close()
    pool.join()


if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N, process_num)

You can also use a partial function from functool, with

你也可以使用functool的部分功能

import functools

def close_pool(pool, results):
    if result:
        pool.terminate()

def mainFun(N, process_num):
    pool = multiprocessing.Pool(process_num)

    close = funtools.partial(close_pool, pool)
....