在python中并行运行带有位置和可选参数的函数(跟进)

时间:2021-05-27 23:18:33

This is a follow up question to: Python: How can I run python functions in parallel?

这是一个后续问题:Python:如何并行运行python函数?

Minimal Working Example:

最小工作范例:

'''
Created on 06.05.2015
https://*.com/questions/7207309/python-how-can-i-run-python-functions-in-parallel
'''
from multiprocessing import Process
import time

def runInParallel(*fns):
    proc = []
    for fn in fns:
        p = Process(target=fn)
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

def func1():
    s=time.time()
    print 'func1: starting', s 
    for i in xrange(1000000000):
        if i==i:
            pass
    e = time.time()
    print 'func1: finishing', e
    print 'duration', e-s

if __name__ == '__main__':
    s =time.time()
    runInParallel(func1, func1, func1, func1, func1)
    print time.time()-s

Which leeds to this (and it's exactly what i want):

哪个利用了(这正是我想要的):

func1: starting 1430920678.09

func1:从1430920678.09开始

func1: starting 1430920678.53

func1:起始于1430920678.53

func1: starting 1430920679.02

func1:从1430920679.02开始

func1: starting 1430920679.57

func1:起始于1430920679.57

func1: starting 1430920680.55

func1:从1430920680.55开始

func1: finishing 1430920729.68

func1:完成1430920729.68

duration 51.1449999809

func1: finishing 1430920729.78

func1:完成1430920729.78

duration 51.6889998913

func1: finishing 1430920730.69

func1:完成1430920730.69

duration 51.1239998341

func1: finishing 1430920748.64

func1:完成1430920748.64

duration 69.6180000305

func1: finishing 1430920749.25

func1:完成1430920749.25

duration 68.7009999752

71.5629999638

However, my function has quite a load of arguments, so i tested it like this:

但是,我的函数有很多参数,所以我测试它是这样的:

-> func1(a) now gets an argument passed.

- > func1(a)现在获得一个参数传递。

'''
Created on 06.05.2015
https://*.com/questions/7207309/python-how-can-i-run-python-functions-in-parallel
'''
from multiprocessing import Process
import time

def runInParallel(*fns):
    proc = []
    for fn in fns:
        p = Process(target=fn)
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

def func1(a):
    s=time.time()
    print 'func1: starting', s 
    for i in xrange(a):
        if i==i:
            pass
    e = time.time()
    print 'func1: finishing', e
    print 'duration', e-s

if __name__ == '__main__':
    s =time.time()
    g=s
    runInParallel(func1(1000000000), func1(1000000000),
                  func1(1000000000), func1(1000000000),
                  func1(1000000000))
    print time.time()-s

So now this happens:

所以现在这发生了:

func1: starting 1430921299.08

func1:起始于1430921299.08

func1: finishing 1430921327.84

func1:完成1430921327.84

duration 28.760999918

func1: starting 1430921327.84

func1:起始1430921327.84

func1: finishing 1430921357.68

func1:完成1430921357.68

duration 29.8410000801

func1: starting 1430921357.68

func1:起始于1430921357.68

func1: finishing 1430921387.14

func1:完成1430921387.14

duration 29.4619998932

func1: starting 1430921387.14

func1:起始于1430921387.14

func1: finishing 1430921416.52

func1:完成1430921416.52

duration 29.3849999905

func1: starting 1430921416.52

func1:起始于1430921416.52

func1: finishing 1430921447.39

func1:完成1430921447.39

duration 30.864000082

151.392999887

The process is now sequential and no longer parallel, and i don't get why! What am I missing and doing wrong?

这个过程现在是顺序的,不再是平行的,我不明白为什么!我错过了什么,做错了什么?

EDIT: Additionally, how would an example look like, whre a few arguments are positional and others which are optional?

编辑:另外,一个例子怎么样,只有一些参数是位置的,其他的是可选的?

3 个解决方案

#1


You have to pass your arguments to the Process using the argument args. For instance:

您必须使用参数args将参数传递给Process。例如:

def runInParallel(*fns):
    proc = []
    for fn, arg in fns:
        p = Process(target=fn, args=(arg,))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

And then call the function using:

然后使用以下命令调用该函数:

runInParallel((func1, 10**9),
              (func1, 10**9),
              (func1, 10**9))

Also, you might consider using a Pool instead:

此外,您可以考虑使用池:

from multiprocessing import Pool

pool = Pool()
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))

EDIT:

Process and Pool.apply_asynch work the same way. They take two optional arguments args and kwargs. These are the standard variables for positional arguments and keyword arguments in python:

Process和Pool.apply_asynch的工作方式相同。他们采用两个可选参数args和kwargs。这些是python中位置参数和关键字参数的标准变量:

f(1, 2, a=3, b=4)  # is equivalent to
args, kwargs = (1, 2), {"a":3, "b":4}
f(*args, **kwargs)

Same example with multiprocessing:

多处理的相同示例:

args, kwargs = (1, 2), {"a":3, "b":4}
Process(target=f, args=args, kwargs=kwargs).start()
# Or
pool = Pool()
args, kwargs = (1, 2), {"a":3, "b":4}
pool.apply_async(f, args, kwargs)

#2


If you don't mind using a fork of multiprocessing, you can do something pretty cool with multiple arguments for your target of the parallel map. Here, I build a function that requires 2 arguments, but also has one optional argument as well as takes *args and **kwds. I'll build a list of inputs that have a random length, and run those in parallel.

如果您不介意使用多处理分支,那么您可以使用多个参数为并行映射的目标做一些非常酷的事情。在这里,我构建了一个需要2个参数的函数,但也有一个可选参数以及* args和** kwds。我将构建一个具有随机长度的输入列表,并并行运行。

>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> pmap = PPool().map
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> tmap = TPool().map
>>> import numpy
>>>
>>> # build a function with multiple arguments, some optional
>>> def do_it(x,y,z=1,*args,**kwds):
...   import time
...   import random
...   s = time.time()
...   print 'starting', s
...   time.sleep(random.random())
...   res = sum([x,y,z]+list(args)+kwds.values())
...   e = time.time()
...   print 'finishing', e
...   print 'duration', e-s
...   return res
... 
>>> # create a bunch of random-length arrays as input for do_it
>>> input = map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5))
>>> input
[array([ 0.25178071,  0.68871176,  0.92305523,  0.47103722]), array([ 0.14214278,  0.16747431,  0.59177496,  0.79984192]), array([ 0.20061353,  0.94339813,  0.67396539,  0.99919187]), array([ 0.63974882,  0.46868301,  0.59963679,  0.97704561]), array([ 0.14515633,  0.97824495,  0.57832663,  0.34167116])] 

Now, let's get our results...

现在,让我们得到我们的结果......

>>> # call do_it in parallel, with random-length inputs
>>> result = pmap(do_it, *input)
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
finishing 1431039903.21
finishing 1431039903.21
duration 0.358909130096
duration 0.35973405838
finishing 1431039903.21
finishing 1431039903.21
duration 0.359538078308
duration 0.358761072159
>>> result
[1.379442164896775, 3.2465121635066176, 3.3667590048477187, 3.5887877829029042]

Of course, if you wanted to be tricky, you could run a triple-nested map all in one line.

当然,如果你想要变得棘手,你可以在一行中运行三重嵌套的地图。

>>> # do it, all in one line
>>> result = pmap(do_it, *map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
finishing 1431040673.73
finishing 1431040673.73
duration 0.110394001007
duration 0.111043930054
finishing 1431040673.73
duration 0.110962152481
finishing 1431040673.73
duration 0.110266923904
finishing 1431040673.74
duration 0.110939025879
>>> result
[1.9904591398425764, 1.932317817954369, 2.6365732054048432, 2.5168248011900047, 2.0410734229587968]

And, you could probably not use a blocking or serial map at all, and things would be really fast (I'm ignoring numpy random seeding here).

并且,你根本不可能使用阻塞或连续映射,事情会非常快(我在这里忽略了numpy随机播种)。

>>> # get a non-blocking thread map and an asynchronous processing map
>>> itmap = TPool().imap
>>> apmap = Pool().amap
>>>
>>> # do it!
>>> result = apmap(do_it, *itmap(numpy.random.random, itmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431041250.33
starting 1431041250.33
starting 1431041250.33
finishing 1431041250.44
duration 0.110985040665
finishing 1431041250.44
duration 0.110254049301
finishing 1431041250.45
duration 0.110941886902
>>> result.get()
[3.6386644432719697, 0.43038222983159957, 3.6220901279963318]

Get pathos here: https://github.com/uqfoundation

在这里获取道具:https://github.com/uqfoundation

#3


Issue

I think your issue come from the fact you're giving a function handler in the 1st example and evaluate directly the function in the 2nd example.

我认为您的问题来自于您在第一个示例中提供函数处理程序并直接评估第二个示例中的函数。

i.e.

func1

is not equivalent to

不等于

func1 ()

Solution

According to s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process you have to give your argument separatly like

根据s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process你必须分开给你的论点

p = Process(target=fn, args=(10000000,))

Hope this helped

希望这有帮助

#1


You have to pass your arguments to the Process using the argument args. For instance:

您必须使用参数args将参数传递给Process。例如:

def runInParallel(*fns):
    proc = []
    for fn, arg in fns:
        p = Process(target=fn, args=(arg,))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()

And then call the function using:

然后使用以下命令调用该函数:

runInParallel((func1, 10**9),
              (func1, 10**9),
              (func1, 10**9))

Also, you might consider using a Pool instead:

此外,您可以考虑使用池:

from multiprocessing import Pool

pool = Pool()
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))
pool.apply_async(func1, (10**9,))

EDIT:

Process and Pool.apply_asynch work the same way. They take two optional arguments args and kwargs. These are the standard variables for positional arguments and keyword arguments in python:

Process和Pool.apply_asynch的工作方式相同。他们采用两个可选参数args和kwargs。这些是python中位置参数和关键字参数的标准变量:

f(1, 2, a=3, b=4)  # is equivalent to
args, kwargs = (1, 2), {"a":3, "b":4}
f(*args, **kwargs)

Same example with multiprocessing:

多处理的相同示例:

args, kwargs = (1, 2), {"a":3, "b":4}
Process(target=f, args=args, kwargs=kwargs).start()
# Or
pool = Pool()
args, kwargs = (1, 2), {"a":3, "b":4}
pool.apply_async(f, args, kwargs)

#2


If you don't mind using a fork of multiprocessing, you can do something pretty cool with multiple arguments for your target of the parallel map. Here, I build a function that requires 2 arguments, but also has one optional argument as well as takes *args and **kwds. I'll build a list of inputs that have a random length, and run those in parallel.

如果您不介意使用多处理分支,那么您可以使用多个参数为并行映射的目标做一些非常酷的事情。在这里,我构建了一个需要2个参数的函数,但也有一个可选参数以及* args和** kwds。我将构建一个具有随机长度的输入列表,并并行运行。

>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> pmap = PPool().map
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>> tmap = TPool().map
>>> import numpy
>>>
>>> # build a function with multiple arguments, some optional
>>> def do_it(x,y,z=1,*args,**kwds):
...   import time
...   import random
...   s = time.time()
...   print 'starting', s
...   time.sleep(random.random())
...   res = sum([x,y,z]+list(args)+kwds.values())
...   e = time.time()
...   print 'finishing', e
...   print 'duration', e-s
...   return res
... 
>>> # create a bunch of random-length arrays as input for do_it
>>> input = map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5))
>>> input
[array([ 0.25178071,  0.68871176,  0.92305523,  0.47103722]), array([ 0.14214278,  0.16747431,  0.59177496,  0.79984192]), array([ 0.20061353,  0.94339813,  0.67396539,  0.99919187]), array([ 0.63974882,  0.46868301,  0.59963679,  0.97704561]), array([ 0.14515633,  0.97824495,  0.57832663,  0.34167116])] 

Now, let's get our results...

现在,让我们得到我们的结果......

>>> # call do_it in parallel, with random-length inputs
>>> result = pmap(do_it, *input)
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
starting 1431039902.85
finishing 1431039903.21
finishing 1431039903.21
duration 0.358909130096
duration 0.35973405838
finishing 1431039903.21
finishing 1431039903.21
duration 0.359538078308
duration 0.358761072159
>>> result
[1.379442164896775, 3.2465121635066176, 3.3667590048477187, 3.5887877829029042]

Of course, if you wanted to be tricky, you could run a triple-nested map all in one line.

当然,如果你想要变得棘手,你可以在一行中运行三重嵌套的地图。

>>> # do it, all in one line
>>> result = pmap(do_it, *map(numpy.random.random, tmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
starting 1431040673.62
finishing 1431040673.73
finishing 1431040673.73
duration 0.110394001007
duration 0.111043930054
finishing 1431040673.73
duration 0.110962152481
finishing 1431040673.73
duration 0.110266923904
finishing 1431040673.74
duration 0.110939025879
>>> result
[1.9904591398425764, 1.932317817954369, 2.6365732054048432, 2.5168248011900047, 2.0410734229587968]

And, you could probably not use a blocking or serial map at all, and things would be really fast (I'm ignoring numpy random seeding here).

并且,你根本不可能使用阻塞或连续映射,事情会非常快(我在这里忽略了numpy随机播种)。

>>> # get a non-blocking thread map and an asynchronous processing map
>>> itmap = TPool().imap
>>> apmap = Pool().amap
>>>
>>> # do it!
>>> result = apmap(do_it, *itmap(numpy.random.random, itmap(numpy.random.randint, [2]*5, [6]*5)))
starting 1431041250.33
starting 1431041250.33
starting 1431041250.33
finishing 1431041250.44
duration 0.110985040665
finishing 1431041250.44
duration 0.110254049301
finishing 1431041250.45
duration 0.110941886902
>>> result.get()
[3.6386644432719697, 0.43038222983159957, 3.6220901279963318]

Get pathos here: https://github.com/uqfoundation

在这里获取道具:https://github.com/uqfoundation

#3


Issue

I think your issue come from the fact you're giving a function handler in the 1st example and evaluate directly the function in the 2nd example.

我认为您的问题来自于您在第一个示例中提供函数处理程序并直接评估第二个示例中的函数。

i.e.

func1

is not equivalent to

不等于

func1 ()

Solution

According to s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process you have to give your argument separatly like

根据s://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process你必须分开给你的论点

p = Process(target=fn, args=(10000000,))

Hope this helped

希望这有帮助