Python:并行执行cat子进程

时间:2021-05-21 20:42:31

I am running several cat | zgrep commands on a remote server and gathering their output individually for further processing:

我正在远程服务器上运行几个cat | zgrep命令,分别收集它们的输出,以便进行进一步处理:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('\n')
        process_data(log_lines)

This, however, results in sequential execution of the subprocess('ssh ... cat ...') commands. Second peak waits for first to finish and so on.

然而,这会导致子进程的顺序执行(“ssh……”猫…”)命令。第二个峰等待第一个峰结束等等。

How can I modify this code so that the subprocess calls run in parallel, while still being able to collect the output for each individually?

如何修改此代码,使子进程调用并行运行,同时仍然能够分别收集每个进程的输出?

2 个解决方案

#1


-1  

Another approach (rather than the other suggestion of putting shell processes in the background) is to use multithreading.

另一种方法(而不是将shell进程放在后台的其他建议)是使用多线程。

The run method that you have would then do something like this:

你使用的run方法将会做如下的事情:

thread.start_new_thread ( myFuncThatDoesZGrep)

To collect results, you can do something like this:

要收集结果,你可以这样做:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []

Run the thread as stated above in the link on multithreading. When your thread object has myThread.finished == True, then you can collect the results via myThread.results.

按照上面在多线程链接中所述运行线程。当您的线程对象有myThread时。finish == = True,然后您可以通过myThread.results来收集结果。

#2


25  

You don't need neither multiprocessing nor threading to run subprocesses in parallel e.g.:

并行运行子进程不需要多重处理或线程,例如:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

it runs 5 shell commands simultaneously. Note: neither threads nor multiprocessing module are used here. There is no point to add ampersand & to the shell commands: Popen doesn't wait for the command to complete. You need to call .wait() explicitly.

它同时运行5个shell命令。注意:这里既不使用线程,也不使用多处理模块。没有必要向shell命令添加& &:Popen不等待命令完成。您需要显式地调用.wait()。

It is convenient but it is not necessary to use threads to collect output from subprocesses:

它很方便,但是不需要使用线程从子进程收集输出:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

Related: Python threading multiple bash subprocesses?.

相关:Python线程多个bash子进程?

Here's code example that gets output from several subprocesses concurrently in the same thread:

下面是一个代码示例,它在同一个线程中同时从几个子进程中获取输出:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()

#1


-1  

Another approach (rather than the other suggestion of putting shell processes in the background) is to use multithreading.

另一种方法(而不是将shell进程放在后台的其他建议)是使用多线程。

The run method that you have would then do something like this:

你使用的run方法将会做如下的事情:

thread.start_new_thread ( myFuncThatDoesZGrep)

To collect results, you can do something like this:

要收集结果,你可以这样做:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []

Run the thread as stated above in the link on multithreading. When your thread object has myThread.finished == True, then you can collect the results via myThread.results.

按照上面在多线程链接中所述运行线程。当您的线程对象有myThread时。finish == = True,然后您可以通过myThread.results来收集结果。

#2


25  

You don't need neither multiprocessing nor threading to run subprocesses in parallel e.g.:

并行运行子进程不需要多重处理或线程,例如:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

it runs 5 shell commands simultaneously. Note: neither threads nor multiprocessing module are used here. There is no point to add ampersand & to the shell commands: Popen doesn't wait for the command to complete. You need to call .wait() explicitly.

它同时运行5个shell命令。注意:这里既不使用线程,也不使用多处理模块。没有必要向shell命令添加& &:Popen不等待命令完成。您需要显式地调用.wait()。

It is convenient but it is not necessary to use threads to collect output from subprocesses:

它很方便,但是不需要使用线程从子进程收集输出:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

Related: Python threading multiple bash subprocesses?.

相关:Python线程多个bash子进程?

Here's code example that gets output from several subprocesses concurrently in the same thread:

下面是一个代码示例,它在同一个线程中同时从几个子进程中获取输出:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()