【python】使用unix管道pipe处理stdout实时数据

时间:2022-01-05 02:32:11

现在有个实时抓包处理的程序,大概的流程是 使用tshark抓包->实时上传,如果写log的话是可以的,但是log文件切割需要定时执行。 由于log中有些内容需要实时处理,延迟时间会导致数据误差,所以想到用类似unix管道的方式,实时处理掉标准输出的内容处理,类似生产消费者模式。

场景解说

简单的流程就是

程序输出内容到stdout--> python pipe--> python 处理程序读取管道内的输出内容,然后后续处理

正常情况下可以用生产者消费者模式或者队列来搞定,但是呢这里最开始的程序是第三方的,所以只能从stdout开始处理,也算是有啥牌打啥牌吧。

案例模拟

一个打印方法来模拟stdout输出程序,一个是python管道的处理程序。

echo.py 1秒内随机事件输出一行,一共10行

#coding:utf-8
#author:orangleliu
#title: echo.py

import time
import sys
import random

ii = 1
while ii < 10:
    delay = random.randint(0,100)/100.0 #1秒内的随机时间
    sys.stdout.write("Talking every %s seconds, blabbed %i times\n" % (delay, ii))
    #如果没有flush 后面的程序无法读取的
    sys.stdout.flush()
    ii += 1
    time.sleep(delay)

handler.py 2秒读取一次标准输出,然后处理,比输出程序慢

#coding:utf-8
#orangleliu
#title: handler.py

from subprocess import Popen, PIPE
from os import kill
import signal
import time

talkpipe = Popen(['python', 'echo.py'],
    shell=False, stdout=PIPE)
try:
    while True:
        line = talkpipe.stdout.readline()
        if line:
            print "SERVER HEARD", line.strip()
        else:
            print "no data"
        time.sleep(2)

except KeyboardInterrupt:
    print "Killing child..."
    kill(talkpipe.pid, signal.SIGTERM)

结果总结

time python handler.py
SERVER HEARD Talking every 0.35 seconds, blabbed 1 times
SERVER HEARD Talking every 0.87 seconds, blabbed 2 times
SERVER HEARD Talking every 0.31 seconds, blabbed 3 times
SERVER HEARD Talking every 0.05 seconds, blabbed 4 times
SERVER HEARD Talking every 0.9 seconds, blabbed 5 times
SERVER HEARD Talking every 0.74 seconds, blabbed 6 times
SERVER HEARD Talking every 0.33 seconds, blabbed 7 times
SERVER HEARD Talking every 0.63 seconds, blabbed 8 times
SERVER HEARD Talking every 0.47 seconds, blabbed 9 times
no data
no data
no data
no data
no data
^CKilling child...
python handler.py  0.03s user 0.02s system 0% cpu 27.522 total

echo.py一共是10次打印,每次输出都是在1秒以内,handler.py 2秒处理一次,可以正常的把数据处理,没有数据的时候也会取数据,根据自己的逻辑停止处理,或者是不做处理,等待下次数据到来都可以。基本上能够达成最初的设想。