I am using Python 2 subprocess
with threading
threads to take standard input, process it with binaries A
, B
, and C
and write modified data to standard output.
我正在使用Python 2的子进程,线程采用标准输入,使用二进制文件A、B和C处理,并将修改后的数据写入标准输出。
This script (let's call it: A_to_C.py
) is very slow and I'd like to learn how to fix it.
这个脚本(我们称之为:A_to_C.py)非常慢,我想学习如何修复它。
The general flow is as follows:
一般流程如下:
A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))
B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))
C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin))
produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()
produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()
A_process.wait()
B_process.wait()
C_process.wait()
The idea is that standard input goes into A_to_C.py
:
这个想法是,标准输入进入a_to_c。py:
- The
A
binary processes a chunk of standard input and createsA
-output with the functionproduceA
. - 二进制处理标准输入的一个块,并使用函数produceA创建A-output。
- The
B
binary processes a chunk ofA
's standard output and createsB
-output via the functionproduceB
. - B二进制文件处理a的标准输出的一部分,并通过函数produceB创建B-output。
- The
C
binary processes a chunk ofB
's standard output via the functionproduceC
and writesC
-output to standard output. - C二进制文件通过函数produceC处理B的标准输出,并将C-output写入标准输出。
I did profiling with cProfile and nearly all of the time in this script appears to be spent in acquiring thread locks.
我用cProfile进行了分析,几乎所有的时间都花在了获取线程锁上。
For instance, in a test 417s job, 416s (>99% of the total runtime) is spent on acquiring thread locks:
例如,在一个测试417s作业中,416s(占总运行时的99%)用于获取线程锁:
$ python
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pstats
>>> p = pstats.Stats('1.profile')
>>> p.sort_stats('cumulative').print_stats(10)
Thu Jun 12 22:19:07 2014 1.profile
1755 function calls (1752 primitive calls) in 417.203 CPU seconds
Ordered by: cumulative time
List reduced from 162 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.020 0.020 417.203 417.203 A_to_C.py:90(<module>)
1 0.000 0.000 417.123 417.123 A_to_C.py:809(main)
6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)
32 416.424 13.013 416.424 13.013 {method 'acquire' of 'thread.lock' objects}
3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)
3 0.000 0.000 0.498 0.166 A_to_C.py:473(which)
37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe)
3 0.496 0.165 0.496 0.165 {posix.access}
6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)
3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
What am I doing wrong with my threading.Thread
and/or subprocess.Popen
arrangement which is causing this issue?
我的线程做错了什么?线程和/或子流程。是什么原因导致了这个问题?
5 个解决方案
#1
0
Your calls to subprocess.Popen() implicitly specify the default value of bufsize, 0, which forces unbuffered I/O. Try adding a reasonable buffer size (4K, 16K, even 1M) and see if it makes any difference.
对subprocess.Popen()的调用隐式地指定bufsize(0)的默认值,这将强制未缓冲的I/O。尝试添加一个合理的缓冲区大小(4K, 16K,甚至1M),看看是否有区别。
#2
10
I think you are just being mislead by the way cProfile works. For example, here's a simple script that uses two threads:
我想你只是被cProfile的工作方式误导了。例如,这里有一个使用两个线程的简单脚本:
#!/usr/bin/python
import threading
import time
def f():
time.sleep(10)
def main():
t = threading.Thread(target=f)
t.start()
t.join()
If I test this using cProfile, here's what I get:
如果我用cProfile测试这个,我得到的是:
>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
60 function calls in 10.011 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 10.011 10.011 <string>:1(<module>)
1 0.000 0.000 10.011 10.011 test.py:10(main)
1 0.000 0.000 0.000 0.000 threading.py:1008(daemon)
2 0.000 0.000 0.000 0.000 threading.py:1152(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:241(Condition)
2 0.000 0.000 0.000 0.000 threading.py:259(__init__)
2 0.000 0.000 0.000 0.000 threading.py:293(_release_save)
2 0.000 0.000 0.000 0.000 threading.py:296(_acquire_restore)
2 0.000 0.000 0.000 0.000 threading.py:299(_is_owned)
2 0.000 0.000 10.011 5.005 threading.py:308(wait)
1 0.000 0.000 0.000 0.000 threading.py:541(Event)
1 0.000 0.000 0.000 0.000 threading.py:560(__init__)
2 0.000 0.000 0.000 0.000 threading.py:569(isSet)
4 0.000 0.000 0.000 0.000 threading.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:602(wait)
1 0.000 0.000 0.000 0.000 threading.py:627(_newname)
5 0.000 0.000 0.000 0.000 threading.py:63(_note)
1 0.000 0.000 0.000 0.000 threading.py:656(__init__)
1 0.000 0.000 0.000 0.000 threading.py:709(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:726(start)
1 0.000 0.000 10.010 10.010 threading.py:911(join)
10 10.010 1.001 10.010 1.001 {method 'acquire' of 'thread.lock' objects}
2 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
4 0.000 0.000 0.000 0.000 {method 'release' of 'thread.lock' objects}
4 0.000 0.000 0.000 0.000 {thread.allocate_lock}
2 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
As you can see, it says that almost all of the time is spent acquiring locks. Of course, we know that's not really an accurate representation of what the script was doing. All the time was actually spent in a time.sleep
call inside f()
. The high tottime
of the acquire
call is just because join
was waiting for f
to finish, which means it had to sit and wait to acquire a lock. However, cProfile doesn't show any time being spent in f
at all. We can clearly see what is actually happening because the example code is so simple, but in a more complicated program, this output is very misleading.
如您所见,它说几乎所有的时间都花在获取锁上。当然,我们知道这并不能准确地表示脚本正在做什么。所有的时间实际上都花在了时间上。睡眠内部调用f()。收购调用的高图腾时间仅仅是因为join等待f完成,这意味着它必须坐下来等待获取锁。然而,cProfile并没有显示任何时间被用于f。我们可以清楚地看到实际发生了什么,因为示例代码非常简单,但是在更复杂的程序中,这个输出非常容易引起误解。
You can get more reliable results by using another profiling library, like yappi:
您可以通过使用另一个分析库获得更可靠的结果,比如yappi:
>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()
Clock type: wall
Ordered by: totaltime, desc
name #n tsub ttot tavg
<stdin>:1 <module> 2/1 0.000025 10.00801 5.004003
test.py:10 main 1 0.000060 10.00798 10.00798
..2.7/threading.py:308 _Condition.wait 2 0.000188 10.00746 5.003731
..thon2.7/threading.py:911 Thread.join 1 0.000039 10.00706 10.00706
..ython2.7/threading.py:752 Thread.run 1 0.000024 10.00682 10.00682
test.py:6 f 1 0.000013 10.00680 10.00680
..hon2.7/threading.py:726 Thread.start 1 0.000045 0.000608 0.000608
..thon2.7/threading.py:602 _Event.wait 1 0.000029 0.000484 0.000484
..2.7/threading.py:656 Thread.__init__ 1 0.000064 0.000250 0.000250
..on2.7/threading.py:866 Thread.__stop 1 0.000025 0.000121 0.000121
..lib/python2.7/threading.py:541 Event 1 0.000011 0.000101 0.000101
..python2.7/threading.py:241 Condition 2 0.000025 0.000094 0.000047
..hreading.py:399 _Condition.notifyAll 1 0.000020 0.000090 0.000090
..2.7/threading.py:560 _Event.__init__ 1 0.000018 0.000090 0.000090
..thon2.7/encodings/utf_8.py:15 decode 2 0.000031 0.000071 0.000035
..threading.py:259 _Condition.__init__ 2 0.000064 0.000069 0.000034
..7/threading.py:372 _Condition.notify 1 0.000034 0.000068 0.000068
..hreading.py:299 _Condition._is_owned 3 0.000017 0.000040 0.000013
../threading.py:709 Thread._set_daemon 1 0.000018 0.000035 0.000035
..ding.py:293 _Condition._release_save 2 0.000019 0.000033 0.000016
..thon2.7/threading.py:63 Thread._note 7 0.000020 0.000020 0.000003
..n2.7/threading.py:1152 currentThread 2 0.000015 0.000019 0.000009
..g.py:296 _Condition._acquire_restore 2 0.000011 0.000017 0.000008
../python2.7/threading.py:627 _newname 1 0.000014 0.000014 0.000014
..n2.7/threading.py:58 Thread.__init__ 4 0.000013 0.000013 0.000003
..threading.py:1008 _MainThread.daemon 1 0.000004 0.000004 0.000004
..hon2.7/threading.py:569 _Event.isSet 2 0.000003 0.000003 0.000002
With yappi
, it's much easier to see that the time is being spent in f
.
对于yappi,很容易看出时间花在f上。
I suspect that you'll find that in reality, most of your script's time is spent doing whatever work is being done in produceA
, produceB
, and produceC
.
我怀疑您会发现,实际上,您的脚本的大部分时间都花在了在produceA、produceB和produceC中所做的工作上。
#3
5
TL;DR If your program runs slower than expected, it is probably due to the details of what the intermediate functions do rather than due to IPC or threading. Test with mock functions and processes (as simple as possible) to isolate just the overhead of passing data to/from subprocesses. In a benchmark based closely on your code (below), the performance when passing data to/from subprocesses seems to be roughly equivalent to using shell pipes directly; python is not particularly slow at this task.
如果您的程序运行速度比预期的要慢,可能是由于中间函数的具体功能,而不是由于IPC或线程。使用模拟函数和流程(尽可能简单)进行测试,以隔离向子流程传递数据的开销。在基于代码的基准测试中(如下),将数据传递到/从子进程时的性能似乎大致等同于直接使用shell管道;python在这个任务上并不特别慢。
What is going on with the original code
The general form of the original code is:
原始代码的一般形式是:
def produceB(from_stream, to_stream):
while True:
buf = from_stream.read()
processed_buf = do_expensive_calculation(buf)
to_stream.write(processed_buf)
Here the calculation between read and write takes about 2/3 of the total cpu time of all processes (main and sub) combined - this is cpu time, not wall clock time btw.
在这里,读和写之间的计算需要所有进程(主进程和子进程)的总cpu时间的2/3—这是cpu时间,而不是挂钟时间。
I think that this prevents the I/O from running at full speed. Reads and writes and the calculation each need to have their own thread, with queues to provide buffering between the read and calculation and between the calculation and write (since the amount of buffering that pipes provide is insufficient, I believe).
我认为这阻止了I/O以全速运行。读写和计算都需要有自己的线程,有队列在读和计算之间提供缓冲,在计算和写之间提供缓冲(因为管道提供的缓冲量不够)。
I show below that if there is no processing in between read and write (or equivalently: if the intermediate processing is done in separate thread), then the throughput from threads + subprocess is very high. It is also possible to have separate threads for reads and writes; this adds a bit of overhead but makes writes not block reads and vice versa. Three threads (read, write and processing) is even better, then neither step blocks the others (within the limits of the queue sizes, of course).
下面我将说明,如果读和写之间没有处理(或者等价地说:如果中间处理是在单独的线程中完成的),那么来自线程+子进程的吞吐量将非常高。对于读写也可以有独立的线程;这增加了一些开销,但使写操作不阻塞读取,反之亦然。三个线程(读、写和处理)甚至更好,然后两个步骤都不会阻塞其他线程(当然,在队列大小的限制内)。
Some benchmarks
All benchmarking below is on python 2.7.6 on Ubuntu 14.04LTS 64bit (Intel i7, Ivy Bridge, quad core). The test is to transfer approx 1GB of data in 4KB blocks between two dd
processes, and pass the data through python as an intermediary. The dd processes use medium sized (4KB) blocks; typical text I/O would be smaller (unless it is cleverly buffered by the interpreter, etc), typical binary I/O would of course be much larger. I have one example based on exactly how you did this, and one example based on an alternate approach I had tried some time ago (which turns out to be slower). By the way, thanks for posting this question, it is useful.
下面所有的基准测试都是基于python 2.7.6在Ubuntu 14.04LTS 64位(Intel i7, Ivy Bridge, quad core)上。测试是在两个dd进程之间以4KB块的形式传输约1GB的数据,并作为中介通过python传递数据。dd进程使用中等大小(4KB)块;典型的文本I/O会更小(除非它被解释器巧妙地缓冲),典型的二进制I/O当然会更大。我有一个例子是基于你是如何做到这一点的,还有一个例子是基于我在一段时间前尝试过的另一种方法。顺便说一下,谢谢你发布这个问题,它很有用。
Threads and blocking I/O
First, let's convert the original code in the question into a slightly simpler self-contained example. This is just two processes communicating with a thread that pumps data from one to the other, doing blocking reads and writes.
首先,让我们将问题中的原始代码转换为一个稍微简单的自包含示例。这只是两个进程与线程通信,线程将数据从一个进程传输到另一个进程,执行阻塞读和写操作。
import subprocess, threading
A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def convert_A_to_B(src, dst):
read_size = 8*1024
while True:
try:
buf = src.read(read_size)
if len(buf) == 0: # This is a bit hacky, but seems to reliably happen when the src is closed
break
dst.write(buf)
except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
print str(e)
break
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))
convert_A_to_B_thread.start()
# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()
A_process.wait()
B_process.stdin.close()
B_process.wait()
Results:
结果:
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s
real 0m0.678s
user 0m0.657s
sys 0m1.273s
Not bad! It turns out that the ideal read size in this case is roughly 8k-16KB, much smaller and much larger sizes are somewhat slower. This is probably related to the 4KB block size we asked dd to use.
不坏!结果表明,在这种情况下,理想的读取大小大约为8k-16KB,更小、更大的大小稍微慢一些。这可能与我们要求dd使用的4KB块大小有关。
Select and non-blocking I/O
When I was looking at this type of problem before, I headed in the direction of using select()
, nonblocking I/O, and a single thread. An example of that is in my question here: How to read and write from subprocesses asynchronously?. That was for reading from two processes in parallel, which I have extended below to reading from one process and writing to another. The nonblocking writes are limited to PIPE_BUF or less in size, which is 4KB on my system; for simplicity, the reads are also 4KB although they could be any size. This has a few weird corner cases (and inexplicable hangs, depending on the details) but in the form below it works reliably.
当我以前研究这类问题时,我的方向是使用select()、非阻塞I/O和一个线程。在我的问题中有一个例子:如何从子进程异步地读写?这是为了同时阅读两个进程,我已经扩展到从一个进程读到另一个进程写。非阻塞写入仅限于PIPE_BUF或更小的大小,在我的系统上是4KB;为了简单起见,读取也是4KB,尽管它们可以是任何大小。它有一些奇怪的角落情况(以及莫名其妙的挂起,取决于细节),但在下面的表单中,它可以可靠地工作。
import subprocess, select, fcntl, os, sys
p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)
print "PIPE_BUF = %d" % (select.PIPE_BUF)
read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []
while True:
inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[])
for fd in inputready:
if fd == p1.stdout.fileno():
if len(bufs) < max_buf_len:
data = p1.stdout.read(read_size)
bufs.append(data)
for fd in outputready:
if fd == p2.stdin.fileno() and len(bufs) > 0:
data = bufs.pop(0)
p2.stdin.write(data)
p1.poll()
# If the first process is done and there is nothing more to write out
if p1.returncode != None and len(bufs) == 0:
# Again cleanup is tricky. We expect the second process to finish soon after its input is closed
p2.stdin.close()
p2.wait()
p1.wait()
break
Results:
结果:
PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s
real 0m3.167s
user 0m2.719s
sys 0m2.373s
This is however significantly slower than the version above (even if the read/write size is made 4KB in both for an apples-to-apples comparison). I'm not sure why.
然而,这比上面的版本要慢得多(即使两个版本的读/写大小都是4KB,以进行苹果对苹果的比较)。我不知道为什么。
P.S. Late addition: It appears that it is ok to ignore or exceed PIPE_BUF. This causes an IOError exception to be thrown much of the time from p2.stdin.write()
(errno=11, temporarily unavailable), presumably when there is enough room in the pipe to write something, but less than the full size we are requesting. The same code above with read_size = 64*1024
, and with that exception caught and ignored, runs at 1.4GB/s.
附注:似乎可以忽略或超过PIPE_BUF。这会导致IOError异常在很多时候从p2.stdin.write()中抛出(errno=11,暂时不可用),大概是当管道中有足够的空间写东西时,但小于我们所请求的完整大小。上面read_size = 64*1024的代码,除了捕获和忽略之外,运行速度为1.4GB/s。
Pipe directly
Just as a baseline, how fast is it to run this using the shell version of pipes (in subprocess)? Let's have a look:
作为一个基准,使用shell版本的管道(在子过程中)运行它有多快?让我们看一看:
import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)
Results:
结果:
244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s
real 0m0.466s
user 0m0.300s
sys 0m0.590s
This is notably faster than the threaded python example. However, this is just one copy, while the threaded python version is doing two (into and out of python). Modifying the command to "dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k"
bring the performance to 1.6GB, in line with the python example.
这比线程python示例要快得多。然而,这只是一个副本,而线程python版本正在执行两个副本(进入和退出python)。将命令修改为“dd if=/dev/zero bs=4k计数=244140 | dd bs=4k | dd = =/dev/null bs=4k”,使性能达到1.6GB,与python示例一致。
How to run a comparison in a complete system
Some additional thoughts on how to run a comparison in a complete system. Again for simplicity this is just two processes, and both scripts have the exact same convert_A_to_B()
function.
关于如何在一个完整的系统中运行比较的一些附加想法。为了简单起见,这只是两个进程,两个脚本都具有完全相同的convert_A_to_B()函数。
Script 1: Pass data in python, as above
脚本1:如上所示,在python中传递数据
A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...
Script 2: Comparison script, pass data in shell
脚本2:比较脚本,在shell中传递数据
convert_A_to_B(sys.stdin, sys.stdout)
run this in the shell with: A | python script_2.py | B
在shell中使用:| python script_2运行它。py | B
This allows an apples-to-apples comparison in a complete system, without using mock functions/processes.
这允许在一个完整的系统中进行苹果与苹果的比较,而不需要使用模拟函数/过程。
How does block read size affect the results
For this test, the code from the first (threaded) example above is used, and both dd
and the python script are set to use the same block size reads/writes.
对于这个测试,使用上面第一个(线程)示例中的代码,并将dd和python脚本设置为使用相同的块大小的读/写。
| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |
In theory there should be better performance with larger buffers (perhaps up to cache effects) but in practice Linux pipes slow down with very large buffers, even when using pure shell pipes.
理论上,应该有更好的性能,使用更大的缓冲区(可能是为了缓存效果),但实际上,在使用纯shell管道时,实际上Linux管道会以非常大的缓冲区慢下来。
#4
0
Since you talked about popen()
and pthreads
in comments, I guess you are under a POSIX system (maybe Linux). So did you try to use subprocess32
instead of the standard subprocess
library.
既然您在评论中讨论了popen()和pthreads,我猜您是在POSIX系统(可能是Linux)下。因此,您尝试使用subprocess32而不是标准的子进程库。
Its use is strongly encouraged by the documentation and may lead to some improvment.
它的使用受到文件的强烈鼓励,并可能导致一些改进。
PS: I believe mixing forks (subprocess
) and threads is a bad idea.
我认为混合fork(子进程)和线程是一个坏主意。
PS2: Why python produceA.py | A | python produceB.py | B | python produceC.py | C
does not fit your needs ? Or its equivalent using subprocess
?
PS2:为什么python发生。|, | python产品。py | B | python produceC。py|c不适合你的需要吗?或者使用子过程?
#5
0
This scenario is particularly well suited for a pipeline, where parallelism is implicitly managed by the OS. Since you are after a one-script solution, here you are:
此场景特别适合于管道,其中并行性由OS隐式管理。既然您正在寻找一种单一脚本的解决方案,那么以下就是:
#! /usr/bin/python2
import sys
import subprocess
import pipes
# Define these as needed
def produceA(input, output):
output.write(input.read())
def produceB(input, output):
output.write(input.read())
def produceC(input, output):
output.write(input.read())
# Magic starts here
COMMAND = "{me} prepare_A | A - | {me} A_to_B | B - | {me} B_to_C | C -"
def bootstrap(input, output):
"""Prepares and runs the pipeline."""
me = "./{}".format(pipes.quote(__file__))
subprocess.call(
COMMAND.format(me=me),
stdin=input, stdout=output, shell=True, bufsize=-1
)
if __name__ == '__main__':
ACTIONS = {
"prepare_A": produceA,
"A_to_B": produceB,
"B_to_C": produceC
}
action = ACTIONS[sys.argv[1]] if len(sys.argv) > 1 else bootstrap
action(sys.stdin, sys.stdout)
This script will setup a pipeline or run of one of the produce
functions, depending on the specified command.
该脚本将根据指定的命令设置管道或运行其中一个生成函数。
Make it executable and run it without arguments:
使它可执行,并运行它没有参数:
./A_to_C.py < A.txt > C.txt
Note: it seems like you are using Python 2.6, so this solution is for Python 2.x, although it should run fine in Python 3.x, except that the quote
function has been moved to shlex
since Python 3.3
注意:看起来您正在使用Python 2.6,所以这个解决方案是针对Python 2的。x,尽管在Python 3中运行良好。x,除了引用函数从Python 3.3开始移动到shlex。
#1
0
Your calls to subprocess.Popen() implicitly specify the default value of bufsize, 0, which forces unbuffered I/O. Try adding a reasonable buffer size (4K, 16K, even 1M) and see if it makes any difference.
对subprocess.Popen()的调用隐式地指定bufsize(0)的默认值,这将强制未缓冲的I/O。尝试添加一个合理的缓冲区大小(4K, 16K,甚至1M),看看是否有区别。
#2
10
I think you are just being mislead by the way cProfile works. For example, here's a simple script that uses two threads:
我想你只是被cProfile的工作方式误导了。例如,这里有一个使用两个线程的简单脚本:
#!/usr/bin/python
import threading
import time
def f():
time.sleep(10)
def main():
t = threading.Thread(target=f)
t.start()
t.join()
If I test this using cProfile, here's what I get:
如果我用cProfile测试这个,我得到的是:
>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
60 function calls in 10.011 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 10.011 10.011 <string>:1(<module>)
1 0.000 0.000 10.011 10.011 test.py:10(main)
1 0.000 0.000 0.000 0.000 threading.py:1008(daemon)
2 0.000 0.000 0.000 0.000 threading.py:1152(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:241(Condition)
2 0.000 0.000 0.000 0.000 threading.py:259(__init__)
2 0.000 0.000 0.000 0.000 threading.py:293(_release_save)
2 0.000 0.000 0.000 0.000 threading.py:296(_acquire_restore)
2 0.000 0.000 0.000 0.000 threading.py:299(_is_owned)
2 0.000 0.000 10.011 5.005 threading.py:308(wait)
1 0.000 0.000 0.000 0.000 threading.py:541(Event)
1 0.000 0.000 0.000 0.000 threading.py:560(__init__)
2 0.000 0.000 0.000 0.000 threading.py:569(isSet)
4 0.000 0.000 0.000 0.000 threading.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:602(wait)
1 0.000 0.000 0.000 0.000 threading.py:627(_newname)
5 0.000 0.000 0.000 0.000 threading.py:63(_note)
1 0.000 0.000 0.000 0.000 threading.py:656(__init__)
1 0.000 0.000 0.000 0.000 threading.py:709(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:726(start)
1 0.000 0.000 10.010 10.010 threading.py:911(join)
10 10.010 1.001 10.010 1.001 {method 'acquire' of 'thread.lock' objects}
2 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
4 0.000 0.000 0.000 0.000 {method 'release' of 'thread.lock' objects}
4 0.000 0.000 0.000 0.000 {thread.allocate_lock}
2 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
As you can see, it says that almost all of the time is spent acquiring locks. Of course, we know that's not really an accurate representation of what the script was doing. All the time was actually spent in a time.sleep
call inside f()
. The high tottime
of the acquire
call is just because join
was waiting for f
to finish, which means it had to sit and wait to acquire a lock. However, cProfile doesn't show any time being spent in f
at all. We can clearly see what is actually happening because the example code is so simple, but in a more complicated program, this output is very misleading.
如您所见,它说几乎所有的时间都花在获取锁上。当然,我们知道这并不能准确地表示脚本正在做什么。所有的时间实际上都花在了时间上。睡眠内部调用f()。收购调用的高图腾时间仅仅是因为join等待f完成,这意味着它必须坐下来等待获取锁。然而,cProfile并没有显示任何时间被用于f。我们可以清楚地看到实际发生了什么,因为示例代码非常简单,但是在更复杂的程序中,这个输出非常容易引起误解。
You can get more reliable results by using another profiling library, like yappi:
您可以通过使用另一个分析库获得更可靠的结果,比如yappi:
>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()
Clock type: wall
Ordered by: totaltime, desc
name #n tsub ttot tavg
<stdin>:1 <module> 2/1 0.000025 10.00801 5.004003
test.py:10 main 1 0.000060 10.00798 10.00798
..2.7/threading.py:308 _Condition.wait 2 0.000188 10.00746 5.003731
..thon2.7/threading.py:911 Thread.join 1 0.000039 10.00706 10.00706
..ython2.7/threading.py:752 Thread.run 1 0.000024 10.00682 10.00682
test.py:6 f 1 0.000013 10.00680 10.00680
..hon2.7/threading.py:726 Thread.start 1 0.000045 0.000608 0.000608
..thon2.7/threading.py:602 _Event.wait 1 0.000029 0.000484 0.000484
..2.7/threading.py:656 Thread.__init__ 1 0.000064 0.000250 0.000250
..on2.7/threading.py:866 Thread.__stop 1 0.000025 0.000121 0.000121
..lib/python2.7/threading.py:541 Event 1 0.000011 0.000101 0.000101
..python2.7/threading.py:241 Condition 2 0.000025 0.000094 0.000047
..hreading.py:399 _Condition.notifyAll 1 0.000020 0.000090 0.000090
..2.7/threading.py:560 _Event.__init__ 1 0.000018 0.000090 0.000090
..thon2.7/encodings/utf_8.py:15 decode 2 0.000031 0.000071 0.000035
..threading.py:259 _Condition.__init__ 2 0.000064 0.000069 0.000034
..7/threading.py:372 _Condition.notify 1 0.000034 0.000068 0.000068
..hreading.py:299 _Condition._is_owned 3 0.000017 0.000040 0.000013
../threading.py:709 Thread._set_daemon 1 0.000018 0.000035 0.000035
..ding.py:293 _Condition._release_save 2 0.000019 0.000033 0.000016
..thon2.7/threading.py:63 Thread._note 7 0.000020 0.000020 0.000003
..n2.7/threading.py:1152 currentThread 2 0.000015 0.000019 0.000009
..g.py:296 _Condition._acquire_restore 2 0.000011 0.000017 0.000008
../python2.7/threading.py:627 _newname 1 0.000014 0.000014 0.000014
..n2.7/threading.py:58 Thread.__init__ 4 0.000013 0.000013 0.000003
..threading.py:1008 _MainThread.daemon 1 0.000004 0.000004 0.000004
..hon2.7/threading.py:569 _Event.isSet 2 0.000003 0.000003 0.000002
With yappi
, it's much easier to see that the time is being spent in f
.
对于yappi,很容易看出时间花在f上。
I suspect that you'll find that in reality, most of your script's time is spent doing whatever work is being done in produceA
, produceB
, and produceC
.
我怀疑您会发现,实际上,您的脚本的大部分时间都花在了在produceA、produceB和produceC中所做的工作上。
#3
5
TL;DR If your program runs slower than expected, it is probably due to the details of what the intermediate functions do rather than due to IPC or threading. Test with mock functions and processes (as simple as possible) to isolate just the overhead of passing data to/from subprocesses. In a benchmark based closely on your code (below), the performance when passing data to/from subprocesses seems to be roughly equivalent to using shell pipes directly; python is not particularly slow at this task.
如果您的程序运行速度比预期的要慢,可能是由于中间函数的具体功能,而不是由于IPC或线程。使用模拟函数和流程(尽可能简单)进行测试,以隔离向子流程传递数据的开销。在基于代码的基准测试中(如下),将数据传递到/从子进程时的性能似乎大致等同于直接使用shell管道;python在这个任务上并不特别慢。
What is going on with the original code
The general form of the original code is:
原始代码的一般形式是:
def produceB(from_stream, to_stream):
while True:
buf = from_stream.read()
processed_buf = do_expensive_calculation(buf)
to_stream.write(processed_buf)
Here the calculation between read and write takes about 2/3 of the total cpu time of all processes (main and sub) combined - this is cpu time, not wall clock time btw.
在这里,读和写之间的计算需要所有进程(主进程和子进程)的总cpu时间的2/3—这是cpu时间,而不是挂钟时间。
I think that this prevents the I/O from running at full speed. Reads and writes and the calculation each need to have their own thread, with queues to provide buffering between the read and calculation and between the calculation and write (since the amount of buffering that pipes provide is insufficient, I believe).
我认为这阻止了I/O以全速运行。读写和计算都需要有自己的线程,有队列在读和计算之间提供缓冲,在计算和写之间提供缓冲(因为管道提供的缓冲量不够)。
I show below that if there is no processing in between read and write (or equivalently: if the intermediate processing is done in separate thread), then the throughput from threads + subprocess is very high. It is also possible to have separate threads for reads and writes; this adds a bit of overhead but makes writes not block reads and vice versa. Three threads (read, write and processing) is even better, then neither step blocks the others (within the limits of the queue sizes, of course).
下面我将说明,如果读和写之间没有处理(或者等价地说:如果中间处理是在单独的线程中完成的),那么来自线程+子进程的吞吐量将非常高。对于读写也可以有独立的线程;这增加了一些开销,但使写操作不阻塞读取,反之亦然。三个线程(读、写和处理)甚至更好,然后两个步骤都不会阻塞其他线程(当然,在队列大小的限制内)。
Some benchmarks
All benchmarking below is on python 2.7.6 on Ubuntu 14.04LTS 64bit (Intel i7, Ivy Bridge, quad core). The test is to transfer approx 1GB of data in 4KB blocks between two dd
processes, and pass the data through python as an intermediary. The dd processes use medium sized (4KB) blocks; typical text I/O would be smaller (unless it is cleverly buffered by the interpreter, etc), typical binary I/O would of course be much larger. I have one example based on exactly how you did this, and one example based on an alternate approach I had tried some time ago (which turns out to be slower). By the way, thanks for posting this question, it is useful.
下面所有的基准测试都是基于python 2.7.6在Ubuntu 14.04LTS 64位(Intel i7, Ivy Bridge, quad core)上。测试是在两个dd进程之间以4KB块的形式传输约1GB的数据,并作为中介通过python传递数据。dd进程使用中等大小(4KB)块;典型的文本I/O会更小(除非它被解释器巧妙地缓冲),典型的二进制I/O当然会更大。我有一个例子是基于你是如何做到这一点的,还有一个例子是基于我在一段时间前尝试过的另一种方法。顺便说一下,谢谢你发布这个问题,它很有用。
Threads and blocking I/O
First, let's convert the original code in the question into a slightly simpler self-contained example. This is just two processes communicating with a thread that pumps data from one to the other, doing blocking reads and writes.
首先,让我们将问题中的原始代码转换为一个稍微简单的自包含示例。这只是两个进程与线程通信,线程将数据从一个进程传输到另一个进程,执行阻塞读和写操作。
import subprocess, threading
A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def convert_A_to_B(src, dst):
read_size = 8*1024
while True:
try:
buf = src.read(read_size)
if len(buf) == 0: # This is a bit hacky, but seems to reliably happen when the src is closed
break
dst.write(buf)
except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
print str(e)
break
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))
convert_A_to_B_thread.start()
# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()
A_process.wait()
B_process.stdin.close()
B_process.wait()
Results:
结果:
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s
real 0m0.678s
user 0m0.657s
sys 0m1.273s
Not bad! It turns out that the ideal read size in this case is roughly 8k-16KB, much smaller and much larger sizes are somewhat slower. This is probably related to the 4KB block size we asked dd to use.
不坏!结果表明,在这种情况下,理想的读取大小大约为8k-16KB,更小、更大的大小稍微慢一些。这可能与我们要求dd使用的4KB块大小有关。
Select and non-blocking I/O
When I was looking at this type of problem before, I headed in the direction of using select()
, nonblocking I/O, and a single thread. An example of that is in my question here: How to read and write from subprocesses asynchronously?. That was for reading from two processes in parallel, which I have extended below to reading from one process and writing to another. The nonblocking writes are limited to PIPE_BUF or less in size, which is 4KB on my system; for simplicity, the reads are also 4KB although they could be any size. This has a few weird corner cases (and inexplicable hangs, depending on the details) but in the form below it works reliably.
当我以前研究这类问题时,我的方向是使用select()、非阻塞I/O和一个线程。在我的问题中有一个例子:如何从子进程异步地读写?这是为了同时阅读两个进程,我已经扩展到从一个进程读到另一个进程写。非阻塞写入仅限于PIPE_BUF或更小的大小,在我的系统上是4KB;为了简单起见,读取也是4KB,尽管它们可以是任何大小。它有一些奇怪的角落情况(以及莫名其妙的挂起,取决于细节),但在下面的表单中,它可以可靠地工作。
import subprocess, select, fcntl, os, sys
p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)
print "PIPE_BUF = %d" % (select.PIPE_BUF)
read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []
while True:
inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[])
for fd in inputready:
if fd == p1.stdout.fileno():
if len(bufs) < max_buf_len:
data = p1.stdout.read(read_size)
bufs.append(data)
for fd in outputready:
if fd == p2.stdin.fileno() and len(bufs) > 0:
data = bufs.pop(0)
p2.stdin.write(data)
p1.poll()
# If the first process is done and there is nothing more to write out
if p1.returncode != None and len(bufs) == 0:
# Again cleanup is tricky. We expect the second process to finish soon after its input is closed
p2.stdin.close()
p2.wait()
p1.wait()
break
Results:
结果:
PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s
real 0m3.167s
user 0m2.719s
sys 0m2.373s
This is however significantly slower than the version above (even if the read/write size is made 4KB in both for an apples-to-apples comparison). I'm not sure why.
然而,这比上面的版本要慢得多(即使两个版本的读/写大小都是4KB,以进行苹果对苹果的比较)。我不知道为什么。
P.S. Late addition: It appears that it is ok to ignore or exceed PIPE_BUF. This causes an IOError exception to be thrown much of the time from p2.stdin.write()
(errno=11, temporarily unavailable), presumably when there is enough room in the pipe to write something, but less than the full size we are requesting. The same code above with read_size = 64*1024
, and with that exception caught and ignored, runs at 1.4GB/s.
附注:似乎可以忽略或超过PIPE_BUF。这会导致IOError异常在很多时候从p2.stdin.write()中抛出(errno=11,暂时不可用),大概是当管道中有足够的空间写东西时,但小于我们所请求的完整大小。上面read_size = 64*1024的代码,除了捕获和忽略之外,运行速度为1.4GB/s。
Pipe directly
Just as a baseline, how fast is it to run this using the shell version of pipes (in subprocess)? Let's have a look:
作为一个基准,使用shell版本的管道(在子过程中)运行它有多快?让我们看一看:
import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)
Results:
结果:
244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s
real 0m0.466s
user 0m0.300s
sys 0m0.590s
This is notably faster than the threaded python example. However, this is just one copy, while the threaded python version is doing two (into and out of python). Modifying the command to "dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k"
bring the performance to 1.6GB, in line with the python example.
这比线程python示例要快得多。然而,这只是一个副本,而线程python版本正在执行两个副本(进入和退出python)。将命令修改为“dd if=/dev/zero bs=4k计数=244140 | dd bs=4k | dd = =/dev/null bs=4k”,使性能达到1.6GB,与python示例一致。
How to run a comparison in a complete system
Some additional thoughts on how to run a comparison in a complete system. Again for simplicity this is just two processes, and both scripts have the exact same convert_A_to_B()
function.
关于如何在一个完整的系统中运行比较的一些附加想法。为了简单起见,这只是两个进程,两个脚本都具有完全相同的convert_A_to_B()函数。
Script 1: Pass data in python, as above
脚本1:如上所示,在python中传递数据
A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...
Script 2: Comparison script, pass data in shell
脚本2:比较脚本,在shell中传递数据
convert_A_to_B(sys.stdin, sys.stdout)
run this in the shell with: A | python script_2.py | B
在shell中使用:| python script_2运行它。py | B
This allows an apples-to-apples comparison in a complete system, without using mock functions/processes.
这允许在一个完整的系统中进行苹果与苹果的比较,而不需要使用模拟函数/过程。
How does block read size affect the results
For this test, the code from the first (threaded) example above is used, and both dd
and the python script are set to use the same block size reads/writes.
对于这个测试,使用上面第一个(线程)示例中的代码,并将dd和python脚本设置为使用相同的块大小的读/写。
| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |
In theory there should be better performance with larger buffers (perhaps up to cache effects) but in practice Linux pipes slow down with very large buffers, even when using pure shell pipes.
理论上,应该有更好的性能,使用更大的缓冲区(可能是为了缓存效果),但实际上,在使用纯shell管道时,实际上Linux管道会以非常大的缓冲区慢下来。
#4
0
Since you talked about popen()
and pthreads
in comments, I guess you are under a POSIX system (maybe Linux). So did you try to use subprocess32
instead of the standard subprocess
library.
既然您在评论中讨论了popen()和pthreads,我猜您是在POSIX系统(可能是Linux)下。因此,您尝试使用subprocess32而不是标准的子进程库。
Its use is strongly encouraged by the documentation and may lead to some improvment.
它的使用受到文件的强烈鼓励,并可能导致一些改进。
PS: I believe mixing forks (subprocess
) and threads is a bad idea.
我认为混合fork(子进程)和线程是一个坏主意。
PS2: Why python produceA.py | A | python produceB.py | B | python produceC.py | C
does not fit your needs ? Or its equivalent using subprocess
?
PS2:为什么python发生。|, | python产品。py | B | python produceC。py|c不适合你的需要吗?或者使用子过程?
#5
0
This scenario is particularly well suited for a pipeline, where parallelism is implicitly managed by the OS. Since you are after a one-script solution, here you are:
此场景特别适合于管道,其中并行性由OS隐式管理。既然您正在寻找一种单一脚本的解决方案,那么以下就是:
#! /usr/bin/python2
import sys
import subprocess
import pipes
# Define these as needed
def produceA(input, output):
output.write(input.read())
def produceB(input, output):
output.write(input.read())
def produceC(input, output):
output.write(input.read())
# Magic starts here
COMMAND = "{me} prepare_A | A - | {me} A_to_B | B - | {me} B_to_C | C -"
def bootstrap(input, output):
"""Prepares and runs the pipeline."""
me = "./{}".format(pipes.quote(__file__))
subprocess.call(
COMMAND.format(me=me),
stdin=input, stdout=output, shell=True, bufsize=-1
)
if __name__ == '__main__':
ACTIONS = {
"prepare_A": produceA,
"A_to_B": produceB,
"B_to_C": produceC
}
action = ACTIONS[sys.argv[1]] if len(sys.argv) > 1 else bootstrap
action(sys.stdin, sys.stdout)
This script will setup a pipeline or run of one of the produce
functions, depending on the specified command.
该脚本将根据指定的命令设置管道或运行其中一个生成函数。
Make it executable and run it without arguments:
使它可执行,并运行它没有参数:
./A_to_C.py < A.txt > C.txt
Note: it seems like you are using Python 2.6, so this solution is for Python 2.x, although it should run fine in Python 3.x, except that the quote
function has been moved to shlex
since Python 3.3
注意:看起来您正在使用Python 2.6,所以这个解决方案是针对Python 2的。x,尽管在Python 3中运行良好。x,除了引用函数从Python 3.3开始移动到shlex。