非阻塞读在子进程上。在python中管

时间:2022-10-09 20:26:52

I'm using the subprocess module to start a subprocess and connect to it's output stream (stdout). I want to be able to execute non-blocking reads on its stdout. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline? I'd like this to be portable or at least work under Windows and Linux.

我正在使用子进程模块来启动子进程并连接到它的输出流(stdout)。我希望能够在它的stdout上执行非阻塞读操作。是否有一种方法可以使.readline非阻塞或检查在我调用.readline之前是否有流上的数据?我希望它可以移植,或者至少在Windows和Linux下工作。

here is how I do it for now (It's blocking on the .readline if no data is avaible):

下面是我现在的做法(如果没有数据,它会阻塞.readline):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

25 个解决方案

#1


337  

fcntl, select, asyncproc won't help in this case.

在这种情况下,fcntl、select和asyncproc将不起作用。

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

使用Queue.get_nowait()来读取无阻塞流的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty  # python 3.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

#2


65  

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

我经常遇到类似的问题;我编写的Python程序经常需要能够执行一些基本功能,同时接受来自命令行(stdin)的用户输入。简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()块并没有超时。如果主要功能已经完成,并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能,因为readline()仍然阻塞在其他线程中等待一行。我发现这个问题的一个解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX...

在我看来,这比使用select或signal模块来解决这个问题要干净一些,但它只适用于UNIX……

#3


35  

Python 3.4 introduces new provisional API for asynchronous IO -- asyncio module.

Python 3.4引入了异步IO的新的临时API——asyncio模块。

The approach is similar to twisted-based answer by @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:

该方法类似于由@Bryan Ward提供的基于twisted的答案——定义一个协议,一旦数据准备就绪,就会调用它的方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See "Subprocess" in the docs.

请参阅文档中的“子流程”。

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

这里有一个高级接口asyncio.create_subprocess_exec(),它返回进程对象,它允许使用StreamReader.readline() coroutine(与异步/等待Python 3.5+语法)来读取一行异步操作。

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

readline_and_kill()执行以下任务:

  • start subprocess, redirect its stdout to a pipe
  • 启动子进程,将其stdout重定向到管道。
  • read a line from subprocess' stdout asynchronously
  • 从子进程异步读取一行。
  • kill subprocess
  • 杀子流程
  • wait for it to exit
  • 等待它退出。

Each step could be limited by timeout seconds if necessary.

如果需要,每一步都可能受到超时时间的限制。

#4


19  

Try the asyncproc module. For example:

尝试asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.

该模块按照S.Lott的建议处理所有的线程。

#5


16  

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

你可以很容易地在Twisted中做到这一点。根据您现有的代码库,这可能不是那么容易使用,但是如果您正在构建一个twisted应用程序,那么这样的事情就变得微不足道了。您创建一个ProcessProtocol类,并覆盖outReceived()方法。Twisted(取决于所使用的反应器)通常只是一个大的select()循环,它包含了从不同的文件描述符(通常是网络套接字)处理数据的回调。因此,outReceived()方法只是安装一个回调以处理来自STDOUT的数据。演示此行为的一个简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The Twisted documentation has some good information on this.

Twisted文档对此有一些很好的信息。

If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn't built on top of Twisted, this isn't really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn't applicable for your particular application.

如果您围绕Twisted构建整个应用程序,它将与其他进程(本地或远程)进行异步通信,非常优雅。另一方面,如果你的程序不是建立在Twisted之上的,这也不会有什么帮助。希望这能对其他读者有所帮助,即使它不适用于您的特定应用程序。

#6


15  

Use select & read(1).

使用选择&阅读(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

For readline()-like:

readline()例如:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

#7


8  

One solution is to make another process to perform your read of the process, or make a thread of the process with a timeout.

一种解决方案是让另一个进程执行您对进程的读取,或者使用超时创建进程的线程。

Here's the threaded version of a timeout function:

这是一个超时函数的线程化版本:

http://code.activestate.com/recipes/473878/

http://code.activestate.com/recipes/473878/

However, do you need to read the stdout as it's coming in? Another solution may be to dump the output to a file and wait for the process to finish using p.wait().

但是,你是否需要阅读stdout,因为它进来了?另一个解决方案可能是将输出转储到文件中,并等待进程使用p.wait()完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

#8


7  

Disclaimer: this works only for tornado

免责声明:这只适用于龙卷风。

You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:

您可以通过设置fd为非阻塞,然后使用ioloop来注册回调。我已经把它包装在一个叫做ado_subprocess的鸡蛋里,你可以通过PyPI来安装它:

easy_install tornado_subprocess

now you can do something like this:

现在你可以这样做:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

you can also use it with a RequestHandler

你也可以用它来解决问题。

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

#9


6  

Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:

现有的解决方案对我不起作用(详情如下)。最后的工作是使用read(1)(基于此答案)实现readline。后者不阻碍:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Why existing solutions did not work:

为什么现有的解决方案不起作用:

  1. Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
  2. 需要readline(包括基于队列的)的解决方案总是阻塞。要杀死执行readline的线程是困难的(不可能的)。它只会在创建它的过程结束时被杀死,而不是在输出过程被杀死的时候。
  3. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
  4. 将低级的fcntl与高级的readline调用混合可能不能正常工作,正如anonnn指出的那样。
  5. Using select.poll() is neat, but doesn't work on Windows according to python docs.
  6. 使用select.poll()是很整洁的,但是根据python文档,它不能在Windows上工作。
  7. Using third-party libraries seems overkill for this task and adds additional dependencies.
  8. 对于此任务,使用第三方库似乎是多余的,并添加了额外的依赖项。

#10


3  

I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:

我添加这个问题来读取一些子过程。Popen stdout。这是我的非阻塞阅读解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

#11


3  

Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.

这里是我的代码,用于捕获包括部分行在内的每个子进程的输出。它同时泵出,stdout和stderr几乎是正确的。

Tested and correctly worked on Python 2.7 linux & windows.

在Python 2.7 linux & windows上测试和正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

#12


2  

Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.

在这里添加这个答案,因为它提供了在Windows和Unix上设置非阻塞管道的能力。

All the ctypes details are thanks to @techtonik's answer.

所有ctypes的细节都要感谢@techtonik的回答。

There is a slightly modified version to be used both on Unix and Windows systems.

在Unix和Windows系统中都有一个稍微修改过的版本。

  • Python3 compatible (only minor change needed).
  • 与Python3兼容(只需要少量修改)。
  • Includes posix version, and defines exception to use for either.
  • 包括posix版本,并定义了使用的例外。

This way you can use the same function and exception for Unix and Windows code.

这样,您就可以对Unix和Windows代码使用相同的函数和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://*.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).

为了避免读取不完整的数据,我最后编写了自己的readline生成器(它返回每一行的字节字符串)。

Its a generator so you can for example...

它是一个发电机,所以你可以…

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

#13


2  

This version of non-blocking read doesn't require special modules and will work out-of-the-box on majority of Linux distros.

该版本的非阻塞读取不需要特殊的模块,并且将在大多数Linux发行版上进行开箱即用。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

#14


1  

The select module helps you determine where the next useful input is.

选择模块帮助您确定下一个有用的输入在哪里。

However, you're almost always happier with separate threads. One does a blocking read the stdin, another does wherever it is you don't want blocked.

但是,对于单独的线程,您几乎总是更高兴。一个是阻止读stdin,另一个是在你不想被阻塞的地方做。

#15


0  

I have created a library based on J. F. Sebastian's solution. You can use it.

我已经建立了一个基于J. F.塞巴斯蒂安的解决方案的图书馆。你可以使用它。

https://github.com/cenkalti/what

https://github.com/cenkalti/what

#16


0  

Working from J.F. Sebastian's answer, and several other sources, I've put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn't use any OS-specific call (that I'm aware) and thus should work anywhere.

从J.F. Sebastian的回答,以及其他几个来源,我整理了一个简单的子流程管理器。它提供了非阻塞读取的请求,以及并行运行多个进程。它不使用任何特定于os的调用(我知道),因此应该在任何地方工作。

It's available from pypi, so just pip install shelljob. Refer to the project page for examples and full docs.

它可以从pypi中获得,所以只需安装pip就可以了。请参考项目页面中的示例和完整文档。

#17


0  

EDIT: This implementation still blocks. Use J.F.Sebastian's answer instead.

编辑:这个实现仍然是块。使用参考塞巴斯蒂安的回答。

I tried the top answer, but the additional risk and maintenance of thread code was worrisome.

我尝试了顶部的答案,但是额外的风险和维护线程代码是令人担忧的。

Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.

查看io模块(并且限制为2.6),我找到了BufferedReader。这是我的无线程的非阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

#18


0  

I recently stumbled upon on the same problem I need to read one line at time from stream ( tail run in subprocess ) in non-blocking mode I wanted to avoid next problems: not to burn cpu, don't read stream by one byte (like readline did ), etc

我最近遇到了同样的问题,我需要在非阻塞模式下从流(子进程的尾部运行)中读取一行代码,我希望避免下一个问题:不要烧cpu,不要读一个字节的流(像readline做的那样),等等。

Here is my implementation https://gist.github.com/grubberr/5501e1a9760c3eab5e0a it don't support windows (poll), don't handle EOF, but it works for me well

这里是我的实现https://gist.github.com/grubberr/5501e1a9760c3eab5e0a它不支持windows (poll),不要处理EOF,但是它对我很有效。

#19


0  

why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.

为什么困扰线程队列?不像readline(), BufferedReader.read1()不会阻塞等待\r\n,如果有任何输出,它会返回ASAP。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

#20


0  

In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

在我的例子中,我需要一个日志模块,它捕获后台应用程序的输出,并增加它(添加时间戳、颜色等)。

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

最后,我使用了一个实际的I/O的后台线程。以下代码仅适用于POSIX平台。我剥夺了不必要的部分。

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

如果有人要用这只野兽长期运行,考虑管理打开的描述符。在我看来,这不是一个大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

#21


0  

This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://*.com/a/43012138/3555925

这是一个在子过程中运行交互式命令的示例,而stdout通过使用pseudo终端进行交互。您可以参考:https://*.com/a/43012138/3555925。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

#22


0  

My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

我的问题有点不同,因为我想从一个正在运行的进程中收集stdout和stderr,但最终还是一样,因为我希望在小部件中生成输出。

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

我不想使用队列或额外的线程来使用许多建议的工作区,因为它们不需要执行这样一个常见的任务,比如运行另一个脚本并收集它的输出。

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I'm using the select function call.

在阅读了建议的解决方案和python文档之后,我用下面的实现解决了我的问题。是的,它只适用于POSIX,因为我正在使用select函数调用。

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

我同意文档是令人困惑的,对于这样一个常见的脚本任务来说,实现是很笨拙的。我相信老版本的python对Popen和不同的解释有不同的默认值,因此造成了很多混乱。这对Python 2.7.12和3.5.2都很有效。

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1.

关键是将bufsize=1设置为行缓冲,然后universal_newlines=True作为文本文件处理,而不是在设置bufsize=1时变成默认值的二进制文件。

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

错误、调试和冗长只是将输出打印到终端的宏。

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

这个解决方案是IMHO 99.99%有效的,因为它仍然使用阻塞的readline函数,所以我们假设子过程很好并且输出完整的行。

I welcome feedback to improve the solution as I am still new to Python.

我欢迎反馈来改进解决方案,因为我仍然是Python的新手。

#23


0  

This solution uses the select module to "read any available data" from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn't block further.

该解决方案使用select模块从IO流“读取任何可用数据”。这个函数在数据可用之前先阻塞,然后只读取可用的数据,并且不会进一步阻塞。

Given the fact that it uses the select module, this only works on Unix.

考虑到它使用了select模块,这只能在Unix上运行。

The code is fully PEP8-compliant.

代码完全符合pep8。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

#24


0  

I also faced the problem described by Jesse and solved it by using "select" as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

我还遇到了Jesse描述的问题,用“select”来解决它,就像Bradley, Andy和其他人一样,但是在一个阻塞模式下避免了一个繁忙的循环。它使用假的管子作为假的stdin。选择块并等待stdin或管道准备就绪。当按下一个键时,stdin会解锁select,并且可以通过read(1)检索键值。当一个不同的线程写入到管道中时,管道将打开select,它可以被视为对stdin的需求结束的指示。这里有一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

#25


-2  

Here is a module that supports non-blocking reads and background writes in python:

这是一个支持非阻塞读和背景写在python中的模块:

https://pypi.python.org/pypi/python-nonblock

https://pypi.python.org/pypi/python-nonblock

Provides a function,

提供了一个函数,

nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)

非block_read,它将从流中读取数据,如果可用,则返回空字符串(如果流在另一侧关闭,且所有可能的数据都已读取),则返回空字符串

You may also consider the python-subprocess2 module,

您还可以考虑python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

https://pypi.python.org/pypi/python-subprocess2

which adds to the subprocess module. So on the object returned from "subprocess.Popen" is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.

这增加了子过程模块。因此,从“子进程”返回的对象。添加了一个附加的方法runInBackground。这将启动一个线程并返回一个对象,该对象将自动填充为stdout/stderr,而不会阻塞主线程。

Enjoy!

享受吧!

#1


337  

fcntl, select, asyncproc won't help in this case.

在这种情况下,fcntl、select和asyncproc将不起作用。

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

使用Queue.get_nowait()来读取无阻塞流的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty  # python 3.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

#2


65  

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

我经常遇到类似的问题;我编写的Python程序经常需要能够执行一些基本功能,同时接受来自命令行(stdin)的用户输入。简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()块并没有超时。如果主要功能已经完成,并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能,因为readline()仍然阻塞在其他线程中等待一行。我发现这个问题的一个解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX...

在我看来,这比使用select或signal模块来解决这个问题要干净一些,但它只适用于UNIX……

#3


35  

Python 3.4 introduces new provisional API for asynchronous IO -- asyncio module.

Python 3.4引入了异步IO的新的临时API——asyncio模块。

The approach is similar to twisted-based answer by @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:

该方法类似于由@Bryan Ward提供的基于twisted的答案——定义一个协议,一旦数据准备就绪,就会调用它的方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See "Subprocess" in the docs.

请参阅文档中的“子流程”。

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

这里有一个高级接口asyncio.create_subprocess_exec(),它返回进程对象,它允许使用StreamReader.readline() coroutine(与异步/等待Python 3.5+语法)来读取一行异步操作。

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

readline_and_kill()执行以下任务:

  • start subprocess, redirect its stdout to a pipe
  • 启动子进程,将其stdout重定向到管道。
  • read a line from subprocess' stdout asynchronously
  • 从子进程异步读取一行。
  • kill subprocess
  • 杀子流程
  • wait for it to exit
  • 等待它退出。

Each step could be limited by timeout seconds if necessary.

如果需要,每一步都可能受到超时时间的限制。

#4


19  

Try the asyncproc module. For example:

尝试asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.

该模块按照S.Lott的建议处理所有的线程。

#5


16  

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

你可以很容易地在Twisted中做到这一点。根据您现有的代码库,这可能不是那么容易使用,但是如果您正在构建一个twisted应用程序,那么这样的事情就变得微不足道了。您创建一个ProcessProtocol类,并覆盖outReceived()方法。Twisted(取决于所使用的反应器)通常只是一个大的select()循环,它包含了从不同的文件描述符(通常是网络套接字)处理数据的回调。因此,outReceived()方法只是安装一个回调以处理来自STDOUT的数据。演示此行为的一个简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The Twisted documentation has some good information on this.

Twisted文档对此有一些很好的信息。

If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn't built on top of Twisted, this isn't really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn't applicable for your particular application.

如果您围绕Twisted构建整个应用程序,它将与其他进程(本地或远程)进行异步通信,非常优雅。另一方面,如果你的程序不是建立在Twisted之上的,这也不会有什么帮助。希望这能对其他读者有所帮助,即使它不适用于您的特定应用程序。

#6


15  

Use select & read(1).

使用选择&阅读(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

For readline()-like:

readline()例如:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

#7


8  

One solution is to make another process to perform your read of the process, or make a thread of the process with a timeout.

一种解决方案是让另一个进程执行您对进程的读取,或者使用超时创建进程的线程。

Here's the threaded version of a timeout function:

这是一个超时函数的线程化版本:

http://code.activestate.com/recipes/473878/

http://code.activestate.com/recipes/473878/

However, do you need to read the stdout as it's coming in? Another solution may be to dump the output to a file and wait for the process to finish using p.wait().

但是,你是否需要阅读stdout,因为它进来了?另一个解决方案可能是将输出转储到文件中,并等待进程使用p.wait()完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

#8


7  

Disclaimer: this works only for tornado

免责声明:这只适用于龙卷风。

You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:

您可以通过设置fd为非阻塞,然后使用ioloop来注册回调。我已经把它包装在一个叫做ado_subprocess的鸡蛋里,你可以通过PyPI来安装它:

easy_install tornado_subprocess

now you can do something like this:

现在你可以这样做:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

you can also use it with a RequestHandler

你也可以用它来解决问题。

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

#9


6  

Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:

现有的解决方案对我不起作用(详情如下)。最后的工作是使用read(1)(基于此答案)实现readline。后者不阻碍:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Why existing solutions did not work:

为什么现有的解决方案不起作用:

  1. Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
  2. 需要readline(包括基于队列的)的解决方案总是阻塞。要杀死执行readline的线程是困难的(不可能的)。它只会在创建它的过程结束时被杀死,而不是在输出过程被杀死的时候。
  3. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
  4. 将低级的fcntl与高级的readline调用混合可能不能正常工作,正如anonnn指出的那样。
  5. Using select.poll() is neat, but doesn't work on Windows according to python docs.
  6. 使用select.poll()是很整洁的,但是根据python文档,它不能在Windows上工作。
  7. Using third-party libraries seems overkill for this task and adds additional dependencies.
  8. 对于此任务,使用第三方库似乎是多余的,并添加了额外的依赖项。

#10


3  

I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:

我添加这个问题来读取一些子过程。Popen stdout。这是我的非阻塞阅读解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

#11


3  

Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.

这里是我的代码,用于捕获包括部分行在内的每个子进程的输出。它同时泵出,stdout和stderr几乎是正确的。

Tested and correctly worked on Python 2.7 linux & windows.

在Python 2.7 linux & windows上测试和正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

#12


2  

Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.

在这里添加这个答案,因为它提供了在Windows和Unix上设置非阻塞管道的能力。

All the ctypes details are thanks to @techtonik's answer.

所有ctypes的细节都要感谢@techtonik的回答。

There is a slightly modified version to be used both on Unix and Windows systems.

在Unix和Windows系统中都有一个稍微修改过的版本。

  • Python3 compatible (only minor change needed).
  • 与Python3兼容(只需要少量修改)。
  • Includes posix version, and defines exception to use for either.
  • 包括posix版本,并定义了使用的例外。

This way you can use the same function and exception for Unix and Windows code.

这样,您就可以对Unix和Windows代码使用相同的函数和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://*.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).

为了避免读取不完整的数据,我最后编写了自己的readline生成器(它返回每一行的字节字符串)。

Its a generator so you can for example...

它是一个发电机,所以你可以…

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

#13


2  

This version of non-blocking read doesn't require special modules and will work out-of-the-box on majority of Linux distros.

该版本的非阻塞读取不需要特殊的模块,并且将在大多数Linux发行版上进行开箱即用。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

#14


1  

The select module helps you determine where the next useful input is.

选择模块帮助您确定下一个有用的输入在哪里。

However, you're almost always happier with separate threads. One does a blocking read the stdin, another does wherever it is you don't want blocked.

但是,对于单独的线程,您几乎总是更高兴。一个是阻止读stdin,另一个是在你不想被阻塞的地方做。

#15


0  

I have created a library based on J. F. Sebastian's solution. You can use it.

我已经建立了一个基于J. F.塞巴斯蒂安的解决方案的图书馆。你可以使用它。

https://github.com/cenkalti/what

https://github.com/cenkalti/what

#16


0  

Working from J.F. Sebastian's answer, and several other sources, I've put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn't use any OS-specific call (that I'm aware) and thus should work anywhere.

从J.F. Sebastian的回答,以及其他几个来源,我整理了一个简单的子流程管理器。它提供了非阻塞读取的请求,以及并行运行多个进程。它不使用任何特定于os的调用(我知道),因此应该在任何地方工作。

It's available from pypi, so just pip install shelljob. Refer to the project page for examples and full docs.

它可以从pypi中获得,所以只需安装pip就可以了。请参考项目页面中的示例和完整文档。

#17


0  

EDIT: This implementation still blocks. Use J.F.Sebastian's answer instead.

编辑:这个实现仍然是块。使用参考塞巴斯蒂安的回答。

I tried the top answer, but the additional risk and maintenance of thread code was worrisome.

我尝试了顶部的答案,但是额外的风险和维护线程代码是令人担忧的。

Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.

查看io模块(并且限制为2.6),我找到了BufferedReader。这是我的无线程的非阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

#18


0  

I recently stumbled upon on the same problem I need to read one line at time from stream ( tail run in subprocess ) in non-blocking mode I wanted to avoid next problems: not to burn cpu, don't read stream by one byte (like readline did ), etc

我最近遇到了同样的问题,我需要在非阻塞模式下从流(子进程的尾部运行)中读取一行代码,我希望避免下一个问题:不要烧cpu,不要读一个字节的流(像readline做的那样),等等。

Here is my implementation https://gist.github.com/grubberr/5501e1a9760c3eab5e0a it don't support windows (poll), don't handle EOF, but it works for me well

这里是我的实现https://gist.github.com/grubberr/5501e1a9760c3eab5e0a它不支持windows (poll),不要处理EOF,但是它对我很有效。

#19


0  

why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.

为什么困扰线程队列?不像readline(), BufferedReader.read1()不会阻塞等待\r\n,如果有任何输出,它会返回ASAP。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

#20


0  

In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

在我的例子中,我需要一个日志模块,它捕获后台应用程序的输出,并增加它(添加时间戳、颜色等)。

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

最后,我使用了一个实际的I/O的后台线程。以下代码仅适用于POSIX平台。我剥夺了不必要的部分。

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

如果有人要用这只野兽长期运行,考虑管理打开的描述符。在我看来,这不是一个大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

#21


0  

This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://*.com/a/43012138/3555925

这是一个在子过程中运行交互式命令的示例,而stdout通过使用pseudo终端进行交互。您可以参考:https://*.com/a/43012138/3555925。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

#22


0  

My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

我的问题有点不同,因为我想从一个正在运行的进程中收集stdout和stderr,但最终还是一样,因为我希望在小部件中生成输出。

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

我不想使用队列或额外的线程来使用许多建议的工作区,因为它们不需要执行这样一个常见的任务,比如运行另一个脚本并收集它的输出。

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I'm using the select function call.

在阅读了建议的解决方案和python文档之后,我用下面的实现解决了我的问题。是的,它只适用于POSIX,因为我正在使用select函数调用。

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

我同意文档是令人困惑的,对于这样一个常见的脚本任务来说,实现是很笨拙的。我相信老版本的python对Popen和不同的解释有不同的默认值,因此造成了很多混乱。这对Python 2.7.12和3.5.2都很有效。

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1.

关键是将bufsize=1设置为行缓冲,然后universal_newlines=True作为文本文件处理,而不是在设置bufsize=1时变成默认值的二进制文件。

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

错误、调试和冗长只是将输出打印到终端的宏。

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

这个解决方案是IMHO 99.99%有效的,因为它仍然使用阻塞的readline函数,所以我们假设子过程很好并且输出完整的行。

I welcome feedback to improve the solution as I am still new to Python.

我欢迎反馈来改进解决方案,因为我仍然是Python的新手。

#23


0  

This solution uses the select module to "read any available data" from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn't block further.

该解决方案使用select模块从IO流“读取任何可用数据”。这个函数在数据可用之前先阻塞,然后只读取可用的数据,并且不会进一步阻塞。

Given the fact that it uses the select module, this only works on Unix.

考虑到它使用了select模块,这只能在Unix上运行。

The code is fully PEP8-compliant.

代码完全符合pep8。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

#24


0  

I also faced the problem described by Jesse and solved it by using "select" as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

我还遇到了Jesse描述的问题,用“select”来解决它,就像Bradley, Andy和其他人一样,但是在一个阻塞模式下避免了一个繁忙的循环。它使用假的管子作为假的stdin。选择块并等待stdin或管道准备就绪。当按下一个键时,stdin会解锁select,并且可以通过read(1)检索键值。当一个不同的线程写入到管道中时,管道将打开select,它可以被视为对stdin的需求结束的指示。这里有一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

#25


-2  

Here is a module that supports non-blocking reads and background writes in python:

这是一个支持非阻塞读和背景写在python中的模块:

https://pypi.python.org/pypi/python-nonblock

https://pypi.python.org/pypi/python-nonblock

Provides a function,

提供了一个函数,

nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)

非block_read,它将从流中读取数据,如果可用,则返回空字符串(如果流在另一侧关闭,且所有可能的数据都已读取),则返回空字符串

You may also consider the python-subprocess2 module,

您还可以考虑python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

https://pypi.python.org/pypi/python-subprocess2

which adds to the subprocess module. So on the object returned from "subprocess.Popen" is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.

这增加了子过程模块。因此,从“子进程”返回的对象。添加了一个附加的方法runInBackground。这将启动一个线程并返回一个对象,该对象将自动填充为stdout/stderr,而不会阻塞主线程。

Enjoy!

享受吧!