使用库存Linux工具的最小“任务队列”来利用多核CPU

时间:2021-03-06 13:51:00

What is the best/easiest way to build a minimal task queue system for Linux using bash and common tools?

使用bash和常用工具为Linux构建最小任务队列系统的最佳/最简单方法是什么?

I have a file with 9'000 lines, each line has a bash command line, the commands are completely independent.

我有一个9'000行的文件,每行有一个bash命令行,命令是完全独立的。

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
...

My box has more than one core and I want to execute X tasks at the same time. I searched the web for a good way to do this. Apparently, a lot of people have this problem but nobody has a good solution so far.

我的盒子有多个核心,我想同时执行X任务。我在网上搜索了一个很好的方法来做到这一点。显然,很多人都有这个问题,但到目前为止还没有人有一个好的解决方案。

It would be nice if the solution had the following features:

如果解决方案具有以下功能,那将是很好的:

  • can interpret more than one command (e.g. command; command)
  • 可以解释多个命令(例如命令;命令)

  • can interpret stream redirects on the lines (e.g. ls > /tmp/ls.txt)
  • 可以解释行上的流重定向(例如ls> /tmp/ls.txt)

  • only uses common Linux tools
  • 仅使用常见的Linux工具

Bonus points if it works on other Unix-clones without too exotic requirements.

如果它适用于其他Unix克隆而没有太多异国情调的要求,则可获得奖励积分。

9 个解决方案

#1


Can you convert your command list to a Makefile? If so, you could just run "make -j X".

你可以将命令列表转换为Makefile吗?如果是这样,你可以运行“make -j X”。

#2


GNU Parallel http://www.gnu.org/software/parallel/ is a more general tool for parallelizing than PPSS.

GNU Parallel http://www.gnu.org/software/parallel/是比PPSS更加通用的并行化工具。

If runfile contains:

如果runfile包含:

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log

you can do:

你可以做:

cat runfile | parallel -j+0

which will run one command per CPU core.

每个CPU核心运行一个命令。

If your commands are as simple as above you do not even need runfile but can do:

如果您的命令如上所述,您甚至不需要runfile但可以执行以下操作:

seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'

If you have more computers available to do the processing you may want to look at the --sshlogin and --trc options for GNU Parallel.

如果有更多计算机可用于处理,您可能需要查看GNU Parallel的--sshlogin和--trc选项。

#3


Okay, after posting the question here, I found the following project which looks promising: ppss.

好的,在这里发布问题后,我发现以下项目看起来很有希望:ppss。

Edit: Not quite what I want, PPSS is focused on processing "all files in directory A".

编辑:不是我想要的,PPSS专注于处理“目录A中的所有文件”。

#4


Well, this is a kind of fun question anyway.

嗯,无论如何这是一个有趣的问题。

Here's what I'd do, assuming bash(1) of course.

这就是我要做的,当然是假设bash(1)。

  • figure out how many of these commands can usefully run concurrently. It's not going to be just the number of cores; a lot of commands will be suspended for I/O and that sort of thing. Call that number N. N=15 for example.
  • 弄清楚这些命令中有多少可以同时有效地运行。它不仅仅是核心数量;很多命令将暂停I / O等等。例如,呼叫该号码N.N = 15。

  • set up a trap signal handler for the SIGCHLD signal, which occurs when a child process terminates. trap signalHandler SIGCHLD
  • 为子进程终止时发生的SIGCHLD信号设置陷阱信号处理程序。陷阱signalHandler SIGCHLD

  • cat your list of commands into a pipe
  • 将您的命令列表添加到管道中

  • write a loop that reads stdin and executes the commands one by one, decrementing a counter. When the counter is 0, it waits.
  • 写一个循环,读取stdin并逐个执行命令,递减计数器。当计数器为0时,它等待。

  • your signal handler, which runs on SIGCHLD, increments that counter.
  • 在SIGCHLD上运行的信号处理程序会递增该计数器。

So now, it runs the first N commands, then waits. When the first child terminates, the wait returns, it reads another line, runs a new command, and waits again.

所以现在,它运行前N个命令,然后等待。当第一个子节点终止时,等待返回,它读取另一行,运行新命令,然后再次等待。

Now, this is a case that takes care of many jobs terminating close together. I suspect you can get away with a simpler version:

现在,这是一个案例,负责许多工作紧密结合在一起。我怀疑你可以使用更简单的版本:

 N=15
 COUNT=N
 cat mycommands.sh | 
 while read cmd 
 do
   eval $cmd &
   if $((count-- == 0))
   then
       wait
   fi
 od

Now, this one will start up the first 15 commands, and then run the rest one at a time as some command terminates.

现在,这个将启动前15个命令,然后在某个命令终止时一次运行其余一个命令。

#5


Similar distributed-computing fun is the Mapreduce Bash Script:

类似的分布式计算乐趣是Mapreduce Bash脚本:

http://blog.last.fm/2009/04/06/mapreduce-bash-script

And thanks for pointing out ppss!

并感谢指出ppss!

#6


You can use the xargs command, its --max-procs does what you want. For instance Charlie Martin solution becomes with xargs:

你可以使用xargs命令,它的--max-procs可以做你想要的。例如,Charlie Martin解决方案与xargs一起使用:

tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c

details:

  • X is the number of processes max. E.g: X=15. --max-procs is doing the magic
  • X是最大进程数。例如:X = 15。 --max-procs正在发挥魔力

  • the first tr is here to terminate lines by null bytes for xargs --null option so that quotes redirection etc are not expansed wrongly
  • 第一个tr用于终止xargs --null选项的空字节行,以便引号重定向等不会被错误地扩展

  • bash -c runs the command
  • bash -c运行命令

I tested it with this mycommands.sh file for instance:

我用这个mycommands.sh文件测试了它,例如:

date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt

#7


This is a specific case, but if you are trying to process a set of files and produce another set of output files, you can start #cores number of processes, and check if an output file exists before processing it. The example below converts a directory of .m4b files to .mp3 files:

这是一个特例,但如果您正在尝试处理一组文件并生成另一组输出文件,则可以启动#cores进程数,并在处理之前检查输出文件是否存在。下面的示例将.m4b文件的目录转换为.mp3文件:

Just run this command as many times as you have cores:

只需像拥有核心一样多次运行此命令:

ls *m4b|while read f; do test -f ${f%m4b}mp3 || mencoder -of rawaudio "$f" -oac mp3lame -ovc copy -o ${f%m4b}mp3; done &

ls * m4b | while read f;做测试-f $ {f%m4b} mp3 || mencoder -of rawaudio“$ f”-oac mp3lame -ovc copy -o $ {f%m4b} mp3;完成&

#8


You could see my tasks queue written on bash: https://github.com/pavelpat/yastq

您可以在bash上看到我的任务队列:https://github.com/pavelpat/yastq

#9


Task Queue + Parallelized + Dynamic addition

任务队列+并行化+动态添加

Using a FIFO, this script fork itself to process the queue. This way, you can add commands to the queue on the fly (when the queue is already started).

使用FIFO,此脚本自行分叉以处理队列。这样,您可以动态地向队列添加命令(当队列已经启动时)。

Usage: ./queue Command [# of children] [Queue name]

用法:./queue命令[子项数] [队列名称]

Example, with 1 thread:

示例,1个线程:

./queue "sleep 5; echo ONE"
./queue "echo TWO"

Output:

ONE
TWO

Example, with 2 thread:

例如,有2个线程:

./queue "sleep 5; echo ONE" 2
./queue "echo TWO"

Output:

TWO
ONE

Example, with 2 queues:

示例,有2个队列:

./queue "sleep 5; echo ONE queue1" 1 queue1
./queue "sleep 3; echo ONE queue2" 1 queue2

Output:

ONE queue2
ONE queue1

The script (save it as "queue" and chmod +x queue):

脚本(将其保存为“queue”和chmod + x队列):


    #!/bin/bash

    #Print usage
    [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit

    #Param 1 - Command to execute
    COMMAND="$1"

    #Param 2 - Number of childs in parallel
    MAXCHILD=1
    [[ $# -gt 1 ]] && MAXCHILD="$2"

    #Param 3 - File to be used as FIFO
    FIFO="/tmp/defaultqueue"
    [[ $# -gt 2 ]] && FIFO="$3"

    #Number of seconds to keep the runner active when unused
    TIMEOUT=5

    runner(){
      #Associate file descriptor 3 to the FIFO
      exec 3"$FIFO"

      while read -u 3 -t $TIMEOUT line; do
        #max child check
        while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do
          sleep 1
        done

        #exec in backgroud
        (eval "$line")&
      done
      rm $FIFO
    }

    writer(){
      #fork if the runner is not running
      lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &)

      #send the command to the runner
      echo "$COMMAND" > $FIFO
    }

    #Create the FIFO file
    [[ -e "$FIFO" ]] || mkfifo "$FIFO"

    #Start the runner if in the runner fork, else put the command in the queue
    [[ "$COMMAND" == "QueueRunner" ]] && runner || writer

#1


Can you convert your command list to a Makefile? If so, you could just run "make -j X".

你可以将命令列表转换为Makefile吗?如果是这样,你可以运行“make -j X”。

#2


GNU Parallel http://www.gnu.org/software/parallel/ is a more general tool for parallelizing than PPSS.

GNU Parallel http://www.gnu.org/software/parallel/是比PPSS更加通用的并行化工具。

If runfile contains:

如果runfile包含:

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log

you can do:

你可以做:

cat runfile | parallel -j+0

which will run one command per CPU core.

每个CPU核心运行一个命令。

If your commands are as simple as above you do not even need runfile but can do:

如果您的命令如上所述,您甚至不需要runfile但可以执行以下操作:

seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'

If you have more computers available to do the processing you may want to look at the --sshlogin and --trc options for GNU Parallel.

如果有更多计算机可用于处理,您可能需要查看GNU Parallel的--sshlogin和--trc选项。

#3


Okay, after posting the question here, I found the following project which looks promising: ppss.

好的,在这里发布问题后,我发现以下项目看起来很有希望:ppss。

Edit: Not quite what I want, PPSS is focused on processing "all files in directory A".

编辑:不是我想要的,PPSS专注于处理“目录A中的所有文件”。

#4


Well, this is a kind of fun question anyway.

嗯,无论如何这是一个有趣的问题。

Here's what I'd do, assuming bash(1) of course.

这就是我要做的,当然是假设bash(1)。

  • figure out how many of these commands can usefully run concurrently. It's not going to be just the number of cores; a lot of commands will be suspended for I/O and that sort of thing. Call that number N. N=15 for example.
  • 弄清楚这些命令中有多少可以同时有效地运行。它不仅仅是核心数量;很多命令将暂停I / O等等。例如,呼叫该号码N.N = 15。

  • set up a trap signal handler for the SIGCHLD signal, which occurs when a child process terminates. trap signalHandler SIGCHLD
  • 为子进程终止时发生的SIGCHLD信号设置陷阱信号处理程序。陷阱signalHandler SIGCHLD

  • cat your list of commands into a pipe
  • 将您的命令列表添加到管道中

  • write a loop that reads stdin and executes the commands one by one, decrementing a counter. When the counter is 0, it waits.
  • 写一个循环,读取stdin并逐个执行命令,递减计数器。当计数器为0时,它等待。

  • your signal handler, which runs on SIGCHLD, increments that counter.
  • 在SIGCHLD上运行的信号处理程序会递增该计数器。

So now, it runs the first N commands, then waits. When the first child terminates, the wait returns, it reads another line, runs a new command, and waits again.

所以现在,它运行前N个命令,然后等待。当第一个子节点终止时,等待返回,它读取另一行,运行新命令,然后再次等待。

Now, this is a case that takes care of many jobs terminating close together. I suspect you can get away with a simpler version:

现在,这是一个案例,负责许多工作紧密结合在一起。我怀疑你可以使用更简单的版本:

 N=15
 COUNT=N
 cat mycommands.sh | 
 while read cmd 
 do
   eval $cmd &
   if $((count-- == 0))
   then
       wait
   fi
 od

Now, this one will start up the first 15 commands, and then run the rest one at a time as some command terminates.

现在,这个将启动前15个命令,然后在某个命令终止时一次运行其余一个命令。

#5


Similar distributed-computing fun is the Mapreduce Bash Script:

类似的分布式计算乐趣是Mapreduce Bash脚本:

http://blog.last.fm/2009/04/06/mapreduce-bash-script

And thanks for pointing out ppss!

并感谢指出ppss!

#6


You can use the xargs command, its --max-procs does what you want. For instance Charlie Martin solution becomes with xargs:

你可以使用xargs命令,它的--max-procs可以做你想要的。例如,Charlie Martin解决方案与xargs一起使用:

tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c

details:

  • X is the number of processes max. E.g: X=15. --max-procs is doing the magic
  • X是最大进程数。例如:X = 15。 --max-procs正在发挥魔力

  • the first tr is here to terminate lines by null bytes for xargs --null option so that quotes redirection etc are not expansed wrongly
  • 第一个tr用于终止xargs --null选项的空字节行,以便引号重定向等不会被错误地扩展

  • bash -c runs the command
  • bash -c运行命令

I tested it with this mycommands.sh file for instance:

我用这个mycommands.sh文件测试了它,例如:

date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt

#7


This is a specific case, but if you are trying to process a set of files and produce another set of output files, you can start #cores number of processes, and check if an output file exists before processing it. The example below converts a directory of .m4b files to .mp3 files:

这是一个特例,但如果您正在尝试处理一组文件并生成另一组输出文件,则可以启动#cores进程数,并在处理之前检查输出文件是否存在。下面的示例将.m4b文件的目录转换为.mp3文件:

Just run this command as many times as you have cores:

只需像拥有核心一样多次运行此命令:

ls *m4b|while read f; do test -f ${f%m4b}mp3 || mencoder -of rawaudio "$f" -oac mp3lame -ovc copy -o ${f%m4b}mp3; done &

ls * m4b | while read f;做测试-f $ {f%m4b} mp3 || mencoder -of rawaudio“$ f”-oac mp3lame -ovc copy -o $ {f%m4b} mp3;完成&

#8


You could see my tasks queue written on bash: https://github.com/pavelpat/yastq

您可以在bash上看到我的任务队列:https://github.com/pavelpat/yastq

#9


Task Queue + Parallelized + Dynamic addition

任务队列+并行化+动态添加

Using a FIFO, this script fork itself to process the queue. This way, you can add commands to the queue on the fly (when the queue is already started).

使用FIFO,此脚本自行分叉以处理队列。这样,您可以动态地向队列添加命令(当队列已经启动时)。

Usage: ./queue Command [# of children] [Queue name]

用法:./queue命令[子项数] [队列名称]

Example, with 1 thread:

示例,1个线程:

./queue "sleep 5; echo ONE"
./queue "echo TWO"

Output:

ONE
TWO

Example, with 2 thread:

例如,有2个线程:

./queue "sleep 5; echo ONE" 2
./queue "echo TWO"

Output:

TWO
ONE

Example, with 2 queues:

示例,有2个队列:

./queue "sleep 5; echo ONE queue1" 1 queue1
./queue "sleep 3; echo ONE queue2" 1 queue2

Output:

ONE queue2
ONE queue1

The script (save it as "queue" and chmod +x queue):

脚本(将其保存为“queue”和chmod + x队列):


    #!/bin/bash

    #Print usage
    [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit

    #Param 1 - Command to execute
    COMMAND="$1"

    #Param 2 - Number of childs in parallel
    MAXCHILD=1
    [[ $# -gt 1 ]] && MAXCHILD="$2"

    #Param 3 - File to be used as FIFO
    FIFO="/tmp/defaultqueue"
    [[ $# -gt 2 ]] && FIFO="$3"

    #Number of seconds to keep the runner active when unused
    TIMEOUT=5

    runner(){
      #Associate file descriptor 3 to the FIFO
      exec 3"$FIFO"

      while read -u 3 -t $TIMEOUT line; do
        #max child check
        while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do
          sleep 1
        done

        #exec in backgroud
        (eval "$line")&
      done
      rm $FIFO
    }

    writer(){
      #fork if the runner is not running
      lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &)

      #send the command to the runner
      echo "$COMMAND" > $FIFO
    }

    #Create the FIFO file
    [[ -e "$FIFO" ]] || mkfifo "$FIFO"

    #Start the runner if in the runner fork, else put the command in the queue
    [[ "$COMMAND" == "QueueRunner" ]] && runner || writer