1.背景
刚到一家公司需要写一个实时分析tshark捕捉到的数据,tshark一直往文本里面写数据,写一个程序要实时获取到添加的数据并进行分析处理最后入库。此时思绪狂飞,想了一些比较挫的方法。
本人想到的方法:
1.每隔一定时间去查看下文件的mtime,如果有改动则读取数据,并记录读取的行数。下次再去读这个文件的数据则上次记录的行数开始继续读。当文件行数太大的时候这个程序的效率就很慢了,也可以记录上次读取的字节数,然后使用linux下的open系统系统中的seek从指定位置处读取。但是要是用C语言来写的话后面的字符串处理和分析就比较麻烦了果断放弃这个方案。
我们经理的方法:
1.利用linux中的pipe,这个方法很是有用。pipe就是管道,一个输入端,一个输出端。只要有数据输入就立马送到输出端进行处理。此办法效率还不错。最终被设计成这样:
1、tshark捕捉数据重定向到pipe中。
2、一个线程从pipe读取数据处理后放入到队列中。
3、另外一个线程从队中取数据进行最后的 处理和入库。
存在的问题:
此法很好,后来当我处理到一个问题的时候需要对实时对apache的日志分析并入库,此时我就准备直接把经理写好的程序demo拿来改改用。但是发现apache无法往pipe中写,我将apache日志文件删除后,重新建立成pipe格式的文件,然后写程序读这个pipe,发现读不了。
可能是apache不支持往pipe中写日志吧。这下歇菜了。
最终的解决方案:
tail -f + pipe 搞定。我们都知道tail -f 一个文件的时候会实时显示文件新增的内容。然后python中的subprocess.Popen中可以指定输出到PIPE中stdout=subprocess.PIPE,这样的话只要一个while每次去读pipe中数据就OK了。
2.方法
经理的方法(为了保密代码做了一定的处理):
#!/usr/bin/env python2.7
import datetime
import time
from CURD import Field, Model, Database
import threading
import Queue
import os
CONFIG = {
}
class dami_log(Model):
date = Field()
netloc = Field()
ip = Field()
path = Field()
cookie = Field()
mac = Field()
cid = Field()
class dami_case(Model):
id = Field()
case_name = Field()
save_time = Field()
is_current = Field()
def thread_read_pipe(q):
if not os.path.exists(CONFIG['pipe']):
os.mkfifo(CONFIG['pipe'], 0777)
f = file(CONFIG['pipe'], 'r')
rest = ''
while True:
读取数据
dat = f.readline()
数据处理.....
放入队列
q.put(dat)
def thread_dump(q,cid):
while True:
try:
从队列中获取数据
line = q.get()
数据处理
dat = line.split('\t')
if any(map(lambda x:dat[3].endswith(x) or (x+"?" in dat[3]),["js","css","ico","jpg","png","gif"])) or dat[3]=="*" or "192.168.10.1" in dat[1]:
print line
else:
payload = {
'date': dat[0] \
if '.' not in dat[0] \
else datetime.datetime.fromtimestamp(
time.mktime(time.strptime(dat[0].rsplit('.', 1)[0],
'%b %d, %Y %X'))),
'netloc': dat[1],
'ip': dat[2],
'path': dat[3],
'cookie': dat[4],
'mac': dat[5],
'cid':cid
}
dami_log(**payload).save()
except Exception as e:
print e
pass
def _main():
CONFIG['db_user'] = '***'
CONFIG['db_passwd'] = '***'
CONFIG['db'] = '***'
CONFIG['pipe'] = '/var/www/script/tsharklog/log.log'
Database.config(user=CONFIG['db_user'],
passwd=CONFIG['db_passwd'],
db=CONFIG['db'])
cid = dami_case.where(is_current=1).getone()
q = Queue.Queue()
trp = threading.Thread(target=thread_read_pipe, args=(q,))
trp.start()
td = threading.Thread(target=thread_dump, args=(q,cid.id))
td.start()
if __name__ == '__main__':
_main()
我处理apache日志实时入库的方法:
#coding:utf-8#!/usr/bin/pythonimport subprocessimport signalimport timeimport datetimeimport timefrom CURD import Field, Model, Databaseimport osflag=1CONFIG = {}class dami_case(Model): id = Field() case_name = Field() save_time = Field() is_current = Field()class dami_muma(Model): id = Field() source = Field() ip = Field() apkname = Field() downtime = Field() cid = Field()##监控文件的变化,获取文件的内容进行存入库中。##网址分析出对应的域名#这里用了find方法,未找到返回-1,若使用index方法获取索引,如果没有找到会抛出异常#需要自己写异常捕捉,否则程序就中断了def gethostname(data): if "http://" in data: data = data[8:] index = data.find('/') if index == -1: return data else: return data[:index]#hostname转换成对应的名字#在网页模板中转换.#def hosttoname(host):## if "baidu" in host:# return "百度"# if "91" in host:# return "91助手"# ifdef monitorLog(logfile): global flag print "Strating........................." popen = subprocess.Popen('tail -n 1 -f ' + logfile, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) while True: line = popen.stdout.readline().strip() if line: if flag == 1: flag = flag+1 continue line = line.split(" ") #测试部分 ''' print "ip:"+line[0] print "time:"+line[3][1:] print "apkname:"+os.path.basename(line[6]) print "http code:"+line[8] print "url:"+gethostname(line[10]) ''' #过滤条件 if line[6] == "456.apk": continue #开始存库 #获取案件id号 cid = dami_case.where(is_current=1).getone() data={ "ip":line[0], "downtime":line[3][1:], "apkname":os.path.basename(line[6]), "source":gethostname[line[10]], "cid":cid.id } dami_muma(**data).save()if __name__ == '__main__':#数据库配置部分 CONFIG['db_user'] = '****' CONFIG['db_passwd'] = '****' CONFIG['db'] = '****' logfile="/var/log/apache2/muma.log" Database.config(user=CONFIG['db_user'], passwd=CONFIG['db_passwd'], db=CONFIG['db']) monitorLog(logfile)
考虑到我的apache日志并不是很多,所以没有考虑用线程加队列了。直接从pipe中读。如果数据多的话,可以考虑一个线程读pipe把数据放到队列中,另外一个线程读队列处理。如果再多的话,就多加几个线程来处理pipe中的数据,这里只有处理好从队列拿数据的原子性就OK了,保证同一时间几个线程从队列拿的是用一份数据。
3.想法
既然web日志可以达到实时入库,那么就可以利用这个思路,实现WEB日志显示。通过python相关web框架/php相关框架和相关图表的库来实现日志web显示。多台服务器的日志可以通过syslog中的-r选项实现日志统一汇总入库。后期有时间我来实现这个功能。觉得还是有点实用价值的。数据量太大就扛不住了,这也是事实。至少可以分析下最近访问的最多的页面确定热点区域。分析访问网站最多的ip,时间段等等信息。
4.总结
一些写程序一直用os里面的popen,后来发现subprocess这个包用处很大,后期认真看下这里面的一些相关函数,还有一个就是线程这个部分的内容。后期重点看看这方面的内容。
本文出自 “专注linux” 博客,请务必保留此出处http://forlinux.blog.51cto.com/8001278/1478271