What is the best/easiest way to build a minimal task queue system for Linux using bash and common tools?
使用bash和常用工具为Linux构建最小任务队列系统的最佳/最简单方法是什么?
I have a file with 9'000 lines, each line has a bash command line, the commands are completely independent.
我有一个9'000行的文件,每行有一个bash命令行,命令是完全独立的。
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
...
My box has more than one core and I want to execute X tasks at the same time. I searched the web for a good way to do this. Apparently, a lot of people have this problem but nobody has a good solution so far.
我的盒子有多个核心,我想同时执行X任务。我在网上搜索了一个很好的方法来做到这一点。显然,很多人都有这个问题,但到目前为止还没有人有一个好的解决方案。
It would be nice if the solution had the following features:
如果解决方案具有以下功能,那将是很好的:
- can interpret more than one command (e.g.
command; command
) - can interpret stream redirects on the lines (e.g.
ls > /tmp/ls.txt
) - only uses common Linux tools
可以解释多个命令(例如命令;命令)
可以解释行上的流重定向(例如ls> /tmp/ls.txt)
仅使用常见的Linux工具
Bonus points if it works on other Unix-clones without too exotic requirements.
如果它适用于其他Unix克隆而没有太多异国情调的要求,则可获得奖励积分。
9 个解决方案
#1
Can you convert your command list to a Makefile? If so, you could just run "make -j X".
你可以将命令列表转换为Makefile吗?如果是这样,你可以运行“make -j X”。
#2
GNU Parallel http://www.gnu.org/software/parallel/ is a more general tool for parallelizing than PPSS.
GNU Parallel http://www.gnu.org/software/parallel/是比PPSS更加通用的并行化工具。
If runfile contains:
如果runfile包含:
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
you can do:
你可以做:
cat runfile | parallel -j+0
which will run one command per CPU core.
每个CPU核心运行一个命令。
If your commands are as simple as above you do not even need runfile but can do:
如果您的命令如上所述,您甚至不需要runfile但可以执行以下操作:
seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'
If you have more computers available to do the processing you may want to look at the --sshlogin and --trc options for GNU Parallel.
如果有更多计算机可用于处理,您可能需要查看GNU Parallel的--sshlogin和--trc选项。
#3
Okay, after posting the question here, I found the following project which looks promising: ppss.
好的,在这里发布问题后,我发现以下项目看起来很有希望:ppss。
Edit: Not quite what I want, PPSS is focused on processing "all files in directory A".
编辑:不是我想要的,PPSS专注于处理“目录A中的所有文件”。
#4
Well, this is a kind of fun question anyway.
嗯,无论如何这是一个有趣的问题。
Here's what I'd do, assuming bash(1) of course.
这就是我要做的,当然是假设bash(1)。
- figure out how many of these commands can usefully run concurrently. It's not going to be just the number of cores; a lot of commands will be suspended for I/O and that sort of thing. Call that number N.
N=15
for example. - set up a trap signal handler for the SIGCHLD signal, which occurs when a child process terminates.
trap signalHandler SIGCHLD
- cat your list of commands into a pipe
- write a loop that reads stdin and executes the commands one by one, decrementing a counter. When the counter is 0, it
wait
s. - your signal handler, which runs on SIGCHLD, increments that counter.
弄清楚这些命令中有多少可以同时有效地运行。它不仅仅是核心数量;很多命令将暂停I / O等等。例如,呼叫该号码N.N = 15。
为子进程终止时发生的SIGCHLD信号设置陷阱信号处理程序。陷阱signalHandler SIGCHLD
将您的命令列表添加到管道中
写一个循环,读取stdin并逐个执行命令,递减计数器。当计数器为0时,它等待。
在SIGCHLD上运行的信号处理程序会递增该计数器。
So now, it runs the first N
commands, then waits. When the first child terminates, the wait returns, it reads another line, runs a new command, and waits again.
所以现在,它运行前N个命令,然后等待。当第一个子节点终止时,等待返回,它读取另一行,运行新命令,然后再次等待。
Now, this is a case that takes care of many jobs terminating close together. I suspect you can get away with a simpler version:
现在,这是一个案例,负责许多工作紧密结合在一起。我怀疑你可以使用更简单的版本:
N=15
COUNT=N
cat mycommands.sh |
while read cmd
do
eval $cmd &
if $((count-- == 0))
then
wait
fi
od
Now, this one will start up the first 15 commands, and then run the rest one at a time as some command terminates.
现在,这个将启动前15个命令,然后在某个命令终止时一次运行其余一个命令。
#5
Similar distributed-computing fun is the Mapreduce Bash Script:
类似的分布式计算乐趣是Mapreduce Bash脚本:
http://blog.last.fm/2009/04/06/mapreduce-bash-script
And thanks for pointing out ppss!
并感谢指出ppss!
#6
You can use the xargs command, its --max-procs does what you want. For instance Charlie Martin solution becomes with xargs:
你可以使用xargs命令,它的--max-procs可以做你想要的。例如,Charlie Martin解决方案与xargs一起使用:
tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c
details:
- X is the number of processes max. E.g: X=15. --max-procs is doing the magic
- the first tr is here to terminate lines by null bytes for xargs --null option so that quotes redirection etc are not expansed wrongly
- bash -c runs the command
X是最大进程数。例如:X = 15。 --max-procs正在发挥魔力
第一个tr用于终止xargs --null选项的空字节行,以便引号重定向等不会被错误地扩展
bash -c运行命令
I tested it with this mycommands.sh file for instance:
我用这个mycommands.sh文件测试了它,例如:
date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt
#7
This is a specific case, but if you are trying to process a set of files and produce another set of output files, you can start #cores number of processes, and check if an output file exists before processing it. The example below converts a directory of .m4b files to .mp3 files:
这是一个特例,但如果您正在尝试处理一组文件并生成另一组输出文件,则可以启动#cores进程数,并在处理之前检查输出文件是否存在。下面的示例将.m4b文件的目录转换为.mp3文件:
Just run this command as many times as you have cores:
只需像拥有核心一样多次运行此命令:
ls *m4b|while read f; do test -f ${f%m4b}mp3 || mencoder -of rawaudio "$f" -oac mp3lame -ovc copy -o ${f%m4b}mp3; done &
ls * m4b | while read f;做测试-f $ {f%m4b} mp3 || mencoder -of rawaudio“$ f”-oac mp3lame -ovc copy -o $ {f%m4b} mp3;完成&
#8
You could see my tasks queue written on bash: https://github.com/pavelpat/yastq
您可以在bash上看到我的任务队列:https://github.com/pavelpat/yastq
#9
Task Queue + Parallelized + Dynamic addition
任务队列+并行化+动态添加
Using a FIFO, this script fork itself to process the queue. This way, you can add commands to the queue on the fly (when the queue is already started).
使用FIFO,此脚本自行分叉以处理队列。这样,您可以动态地向队列添加命令(当队列已经启动时)。
Usage: ./queue Command [# of children] [Queue name]
用法:./queue命令[子项数] [队列名称]
Example, with 1 thread:
示例,1个线程:
./queue "sleep 5; echo ONE" ./queue "echo TWO"
Output:
ONE TWO
Example, with 2 thread:
例如,有2个线程:
./queue "sleep 5; echo ONE" 2 ./queue "echo TWO"
Output:
TWO ONE
Example, with 2 queues:
示例,有2个队列:
./queue "sleep 5; echo ONE queue1" 1 queue1 ./queue "sleep 3; echo ONE queue2" 1 queue2
Output:
ONE queue2 ONE queue1
The script (save it as "queue" and chmod +x queue):
脚本(将其保存为“queue”和chmod + x队列):
#!/bin/bash #Print usage [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit #Param 1 - Command to execute COMMAND="$1" #Param 2 - Number of childs in parallel MAXCHILD=1 [[ $# -gt 1 ]] && MAXCHILD="$2" #Param 3 - File to be used as FIFO FIFO="/tmp/defaultqueue" [[ $# -gt 2 ]] && FIFO="$3" #Number of seconds to keep the runner active when unused TIMEOUT=5 runner(){ #Associate file descriptor 3 to the FIFO exec 3"$FIFO" while read -u 3 -t $TIMEOUT line; do #max child check while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do sleep 1 done #exec in backgroud (eval "$line")& done rm $FIFO } writer(){ #fork if the runner is not running lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &) #send the command to the runner echo "$COMMAND" > $FIFO } #Create the FIFO file [[ -e "$FIFO" ]] || mkfifo "$FIFO" #Start the runner if in the runner fork, else put the command in the queue [[ "$COMMAND" == "QueueRunner" ]] && runner || writer
#1
Can you convert your command list to a Makefile? If so, you could just run "make -j X".
你可以将命令列表转换为Makefile吗?如果是这样,你可以运行“make -j X”。
#2
GNU Parallel http://www.gnu.org/software/parallel/ is a more general tool for parallelizing than PPSS.
GNU Parallel http://www.gnu.org/software/parallel/是比PPSS更加通用的并行化工具。
If runfile contains:
如果runfile包含:
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
you can do:
你可以做:
cat runfile | parallel -j+0
which will run one command per CPU core.
每个CPU核心运行一个命令。
If your commands are as simple as above you do not even need runfile but can do:
如果您的命令如上所述,您甚至不需要runfile但可以执行以下操作:
seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'
If you have more computers available to do the processing you may want to look at the --sshlogin and --trc options for GNU Parallel.
如果有更多计算机可用于处理,您可能需要查看GNU Parallel的--sshlogin和--trc选项。
#3
Okay, after posting the question here, I found the following project which looks promising: ppss.
好的,在这里发布问题后,我发现以下项目看起来很有希望:ppss。
Edit: Not quite what I want, PPSS is focused on processing "all files in directory A".
编辑:不是我想要的,PPSS专注于处理“目录A中的所有文件”。
#4
Well, this is a kind of fun question anyway.
嗯,无论如何这是一个有趣的问题。
Here's what I'd do, assuming bash(1) of course.
这就是我要做的,当然是假设bash(1)。
- figure out how many of these commands can usefully run concurrently. It's not going to be just the number of cores; a lot of commands will be suspended for I/O and that sort of thing. Call that number N.
N=15
for example. - set up a trap signal handler for the SIGCHLD signal, which occurs when a child process terminates.
trap signalHandler SIGCHLD
- cat your list of commands into a pipe
- write a loop that reads stdin and executes the commands one by one, decrementing a counter. When the counter is 0, it
wait
s. - your signal handler, which runs on SIGCHLD, increments that counter.
弄清楚这些命令中有多少可以同时有效地运行。它不仅仅是核心数量;很多命令将暂停I / O等等。例如,呼叫该号码N.N = 15。
为子进程终止时发生的SIGCHLD信号设置陷阱信号处理程序。陷阱signalHandler SIGCHLD
将您的命令列表添加到管道中
写一个循环,读取stdin并逐个执行命令,递减计数器。当计数器为0时,它等待。
在SIGCHLD上运行的信号处理程序会递增该计数器。
So now, it runs the first N
commands, then waits. When the first child terminates, the wait returns, it reads another line, runs a new command, and waits again.
所以现在,它运行前N个命令,然后等待。当第一个子节点终止时,等待返回,它读取另一行,运行新命令,然后再次等待。
Now, this is a case that takes care of many jobs terminating close together. I suspect you can get away with a simpler version:
现在,这是一个案例,负责许多工作紧密结合在一起。我怀疑你可以使用更简单的版本:
N=15
COUNT=N
cat mycommands.sh |
while read cmd
do
eval $cmd &
if $((count-- == 0))
then
wait
fi
od
Now, this one will start up the first 15 commands, and then run the rest one at a time as some command terminates.
现在,这个将启动前15个命令,然后在某个命令终止时一次运行其余一个命令。
#5
Similar distributed-computing fun is the Mapreduce Bash Script:
类似的分布式计算乐趣是Mapreduce Bash脚本:
http://blog.last.fm/2009/04/06/mapreduce-bash-script
And thanks for pointing out ppss!
并感谢指出ppss!
#6
You can use the xargs command, its --max-procs does what you want. For instance Charlie Martin solution becomes with xargs:
你可以使用xargs命令,它的--max-procs可以做你想要的。例如,Charlie Martin解决方案与xargs一起使用:
tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c
details:
- X is the number of processes max. E.g: X=15. --max-procs is doing the magic
- the first tr is here to terminate lines by null bytes for xargs --null option so that quotes redirection etc are not expansed wrongly
- bash -c runs the command
X是最大进程数。例如:X = 15。 --max-procs正在发挥魔力
第一个tr用于终止xargs --null选项的空字节行,以便引号重定向等不会被错误地扩展
bash -c运行命令
I tested it with this mycommands.sh file for instance:
我用这个mycommands.sh文件测试了它,例如:
date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt
#7
This is a specific case, but if you are trying to process a set of files and produce another set of output files, you can start #cores number of processes, and check if an output file exists before processing it. The example below converts a directory of .m4b files to .mp3 files:
这是一个特例,但如果您正在尝试处理一组文件并生成另一组输出文件,则可以启动#cores进程数,并在处理之前检查输出文件是否存在。下面的示例将.m4b文件的目录转换为.mp3文件:
Just run this command as many times as you have cores:
只需像拥有核心一样多次运行此命令:
ls *m4b|while read f; do test -f ${f%m4b}mp3 || mencoder -of rawaudio "$f" -oac mp3lame -ovc copy -o ${f%m4b}mp3; done &
ls * m4b | while read f;做测试-f $ {f%m4b} mp3 || mencoder -of rawaudio“$ f”-oac mp3lame -ovc copy -o $ {f%m4b} mp3;完成&
#8
You could see my tasks queue written on bash: https://github.com/pavelpat/yastq
您可以在bash上看到我的任务队列:https://github.com/pavelpat/yastq
#9
Task Queue + Parallelized + Dynamic addition
任务队列+并行化+动态添加
Using a FIFO, this script fork itself to process the queue. This way, you can add commands to the queue on the fly (when the queue is already started).
使用FIFO,此脚本自行分叉以处理队列。这样,您可以动态地向队列添加命令(当队列已经启动时)。
Usage: ./queue Command [# of children] [Queue name]
用法:./queue命令[子项数] [队列名称]
Example, with 1 thread:
示例,1个线程:
./queue "sleep 5; echo ONE" ./queue "echo TWO"
Output:
ONE TWO
Example, with 2 thread:
例如,有2个线程:
./queue "sleep 5; echo ONE" 2 ./queue "echo TWO"
Output:
TWO ONE
Example, with 2 queues:
示例,有2个队列:
./queue "sleep 5; echo ONE queue1" 1 queue1 ./queue "sleep 3; echo ONE queue2" 1 queue2
Output:
ONE queue2 ONE queue1
The script (save it as "queue" and chmod +x queue):
脚本(将其保存为“queue”和chmod + x队列):
#!/bin/bash #Print usage [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit #Param 1 - Command to execute COMMAND="$1" #Param 2 - Number of childs in parallel MAXCHILD=1 [[ $# -gt 1 ]] && MAXCHILD="$2" #Param 3 - File to be used as FIFO FIFO="/tmp/defaultqueue" [[ $# -gt 2 ]] && FIFO="$3" #Number of seconds to keep the runner active when unused TIMEOUT=5 runner(){ #Associate file descriptor 3 to the FIFO exec 3"$FIFO" while read -u 3 -t $TIMEOUT line; do #max child check while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do sleep 1 done #exec in backgroud (eval "$line")& done rm $FIFO } writer(){ #fork if the runner is not running lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &) #send the command to the runner echo "$COMMAND" > $FIFO } #Create the FIFO file [[ -e "$FIFO" ]] || mkfifo "$FIFO" #Start the runner if in the runner fork, else put the command in the queue [[ "$COMMAND" == "QueueRunner" ]] && runner || writer