当多个脚本运行时,限制Luigi工作者的数量

时间:2022-07-17 02:12:03

From what I saw and understood, when running several Luigi workflows at the same time, the number of workers is summed. This means that if I run two workflows together and that the number of workers is set to n, in the luigi.cfg file and provided that the workflows use more than n workers at the same time, the central scheduler will make use of 2xn workers.

从我所看到和理解的情况来看,当同时运行几个Luigi工作流程时,总计工人数量。这意味着如果我一起运行两个工作流并且worker的数量设置为n,则在luigi.cfg文件中并且假设工作流同时使用多于n个worker,则*调度程序将使用2xn worker 。

In the manual of Luigi, I could not find any way to restrict the number of workers to n, even if I run a dozen of workflows at the same time.

在Luigi的手册中,即使我同时运行了十几个工作流程,我找不到任何方法将工人数量限制为n。

This is my luigi.cfg file

这是我的luigi.cfg文件

[core]
workers: 3

This is the example script I am using (it makes in fact use of sciluigi (a layer on top of luigi) but I don't think it makes a difference concerning the task and the scheduler configuration). I'd like that when I run it several times together, the 3 last workflows wait that the three first workflows are done before starting.

这是我正在使用的示例脚本(它实际上使用了sciluigi(luigi上面的一个层),但我认为它不会对任务和调度程序配置产生影响)。我喜欢这样,当我一起运行几次时,最后3个工作流程等待在开始之前完成三个第一个工作流程。

import optparse
import luigi
import sciluigi
import random
import time
import sys
import os
import subprocess


class MyFooWriter(sciluigi.Task):
    # We have no inputs here
    # Define outputs:
    outdir = sciluigi.Parameter();
    def out_foo(self):
        return sciluigi.TargetInfo(self, os.path.join(self.outdir,'foo.txt'))
    def run(self):
        with self.out_foo().open('w') as foofile:
            foofile.write('foo\n')

class MyFooReplacer(sciluigi.Task):
    replacement = sciluigi.Parameter() # Here, we take as a parameter
                                  # what to replace foo with.
    outFile = sciluigi.Parameter();
    outdir = sciluigi.Parameter();
    # Here we have one input, a "foo file":
    in_foo = None

    # ... and an output, a "bar file":
    def out_replaced(self):
        return sciluigi.TargetInfo(self, os.path.join(self.outdir, self.outFile))

    def run(self):
        replacement = ""
        with open(self.in_foo().path, 'r') as content_file:
            content = content_file.read()
            replacement = content.replace('foo', self.replacement)
            for i in range(1,30):
                sys.stderr.write(str(i)+"\n")
                time.sleep(1)
        with open(self.out_replaced().path,'w') as out_f:
            out_f.write(replacement)



class MyWorkflow(sciluigi.WorkflowTask):
    outdir = luigi.Parameter()
    def workflow(self):
        #rdint = random.randint(1,1000)
        rdint = 100
        barfile = "foobar_" + str(rdint) +'.bar.txt'
        foowriter = self.new_task('foowriter', MyFooWriter, outdir = self.outdir)
        fooreplacer = self.new_task('fooreplacer', MyFooReplacer, replacement='bar', outFile = barfile,  outdir =  self.outdir)
        fooreplacer.in_foo = foowriter.out_foo
        return fooreplacer


# End of script ....
if __name__ == '__main__':
    parser = optparse.OptionParser()
    parser.add_option('-d', dest = "outdir", action="store", default=".")
    options, remainder = parser.parse_args()
    params = {"outdir" : options.outdir}    
    wf = [MyWorkflow(outdir = options.outdir)]
    luigi.build(wf)

This is a little perl script I use to run the script concurrently (in Perl, my favourite language :-)).

这是我用来同时运行脚本的一个小perl脚本(用Perl,我最喜欢的语言:-))。

#! /usr/bin/perl

use strict;

for (my $i = 0; $i < 6; $i++) {
  my $testdir = "test".$i;
  system("mkdir -p $testdir");
  system("python run_sciluigi.py -d $testdir&");
  sleep (2)
}

1 个解决方案

#1


0  

While not exactly a workers restriction, it is possible to use the resources concept to put a global limit on concurrent execution.

虽然不完全是工作者限制,但可以使用资源概念对并发执行施加全局限制。

In luigi.cfg

在luigi.cfg

[resources]
max_workers=5

In all of your tasks:

在您的所有任务中:

class MyFooReplacer(sciluigi.Task):
    resources = {'max_workers': 1}

http://luigi.readthedocs.io/en/stable/configuration.html#resources

http://luigi.readthedocs.io/en/stable/configuration.html#resources

#1


0  

While not exactly a workers restriction, it is possible to use the resources concept to put a global limit on concurrent execution.

虽然不完全是工作者限制,但可以使用资源概念对并发执行施加全局限制。

In luigi.cfg

在luigi.cfg

[resources]
max_workers=5

In all of your tasks:

在您的所有任务中:

class MyFooReplacer(sciluigi.Task):
    resources = {'max_workers': 1}

http://luigi.readthedocs.io/en/stable/configuration.html#resources

http://luigi.readthedocs.io/en/stable/configuration.html#resources