如何在joblib并行执行中使用tqdm ?

时间:2022-01-11 23:59:06

I want to run a function in parallel, and wait until all parallel nodes are done, using joblib. Like in the example:

我想要并行运行一个函数,并使用joblib等待所有并行节点完成。的例子:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

But, I want that the execution will be seen in a single progressbar like with tqdm, showing how many jobs has been completed.

但是,我希望在一个像tqdm这样的单个progressbar中可以看到执行,说明已经完成了多少工作。

How would you do that?

你会怎么做?

3 个解决方案

#1


5  

If your problem consists of many parts, you could split the parts into k subgroups, run each subgroup in parallel and update the progressbar in between, resulting in k updates of the progress.

如果您的问题包含许多部分,您可以将这些部分分成k个子组,并行运行每个子组,并在其中更新progressbar,从而获得进度的k个更新。

This is demonstrated in the following example from the documentation.

下面的示例从文档中演示了这一点。

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

https://pythonhosted.org/joblib/parallel.html reusing-a-pool-of-workers

#2


7  

Here's possible workaround

这是可能的解决方案

def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = {
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,
}

def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

#3


1  

Just put range(10) inside tqdm(...)! It probably seemed too good to be true for you, but it really works (on my machine):

只需要在tqdm(…)中设置范围(10)!它可能看起来太好了,对你来说可能不真实,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed
import multiprocessing  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))

#1


5  

If your problem consists of many parts, you could split the parts into k subgroups, run each subgroup in parallel and update the progressbar in between, resulting in k updates of the progress.

如果您的问题包含许多部分,您可以将这些部分分成k个子组,并行运行每个子组,并在其中更新progressbar,从而获得进度的k个更新。

This is demonstrated in the following example from the documentation.

下面的示例从文档中演示了这一点。

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

https://pythonhosted.org/joblib/parallel.html reusing-a-pool-of-workers

#2


7  

Here's possible workaround

这是可能的解决方案

def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = {
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,
}

def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

#3


1  

Just put range(10) inside tqdm(...)! It probably seemed too good to be true for you, but it really works (on my machine):

只需要在tqdm(…)中设置范围(10)!它可能看起来太好了,对你来说可能不真实,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed
import multiprocessing  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))