(转)Python 日志处理(三) 日志状态码分析、浏览器分析

时间:2021-03-23 09:28:14

原文:https://www.cnblogs.com/i-honey/p/7791564.html

在企业中,从日志中提取数据进行分析,可以帮助企业更加了解用户行为,用户最感兴趣的产品或者内容,分析得到数据后,可以决定企业在今后的走向。

从这些日志数据中,比较重要的有:

1. 用户访问最多的url,即用户在企业网站最感兴趣的产品或者内容

2. 用户群体的的主要线路是什么?移动?联通?电信?

3. 用户访问的高峰期是什么时候?最高PV(访问量)、UV(独立访客)、IP(独立IP)。

4. 各时段状态码数。比如304,表示静态资源在没有发生改变时,服务器要求客户使用了浏览器本地的缓存,可以降低服务器流量负载等。403、404如果异常得出现很多,则要根据访问得url来判断是否有恶意用户在对网站目录进行扫描和探测。400、500等状态码很多的情况就需要运维人员及时分析并排查原因。

5. 客户浏览器的名称、版本。统计出各种浏览器的分布情况,比如:如果手机浏览器、IE 6.0版本浏览器访问记录很多,则大概可以判断出用户群体大概的操作系统是winXP,win7以上版本,或是手机访问。那就需要考虑是否要对特定版本浏览器进行页面优化,或者如果客户是手机浏览器,那是否要压缩网站页面大小,降低流量消耗,亦或是否要对手机端优化,提升用户体验,牢牢得抓住客户。。

这里涉及部分SEO方面知识,仅作了解即可,如果企业真正需要了,再深入学习。

下面的例子对访问状态码和浏览器名称、版本进行了统计,以引出日志分析、数据挖掘的重要性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import datetime
import re
from queue import Queue
import threading
from pathlib import Path
from user_agents import parse
from collections import defaultdict
 
 
# 正则,文件读取,时间窗口,队列,多线程,高阶函数,分发器,嵌套函数
 
logline = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
 
pattern = '''(?P<remote_addr>[\d\.]{7,}) - - (?:\[(?P<datetime>[^\[\]]+)\]) "(?P<request>[^"]+)" (?P<status>\d+) (?P<size>\d+) "[^"]+" "(?P<useragent>[^"]+)"'''
 
# 数据源处理
ops = {
    'datetime'lambda timestr: datetime.datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S %z"),
    'request'lambda request: dict(zip(('method''url''protocol'), request.split())),
    'status'int,
    'size'int,
    'useragent'lambda useragent: parse(useragent)
}
 
regex = re.compile(pattern)
 
def extract(line):
    matcher = regex.match(line)
    if matcher:
        return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
 
 
def openfile(path:str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields  # return generator objects,next(load(path))
            else:
                # TODO 不合格数据有哪些
                continue  # TODO 解析失败就抛弃,或者打印日志
 
 
def load(*paths):
    '''装载日志文件或路径'''
    for item in paths:
        = Path(item)
        if not p.exists():
            continue
 
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
        elif p.is_file():
            yield from openfile(str(p))
 
 
def window(src:Queue, handler, width: int, interval: int):
    '''
    窗口函数
    :param src: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒/ 时间偏移量,秒
    :return:
    '''
 
    start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800''%Y/%m/%d %H:%M:%S %z')
    current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800''%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width-interval)
 
    buffer = []  #窗口里的待计算数据
 
    while True:  #while True方式迭代queue
        # 从数据源获取数据
        data = src.get()   # block阻塞的
 
        if data:
            buffer.append(data)
            current = data['datetime']
 
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)      # 如何处理
            print("{}".format(ret))
 
            start = current
 
            buffer = [i for in buffer if i['datetime'] > current - delta]
 
 
def donothing_handler(iterable:list):
    # print(iterable)
    return iterable
 
 
# 状态码时间段百分比分析
def status_handler(iterable:list):
    = {}
    for item in iterable:
        key = item['status']
        if key not in d:
            d[key] = 0
        d[key] += 1
 
    total= sum(d.values())
    return {'{}: {:.2f}%'.format(k,v/total*100for k,v in d.items()}
 
 
# 浏览器分析函数
ua_dict = defaultdict(lambda 0)  # 作用域改为全局之后,字典递增保存所有ua及其版本
def browser_handler(iterable):
    for item in iterable:
        ua = item['useragent']
        key = (ua.browser.family, ua.browser.version_string)
        ua_dict[key] += 1
    return ua_dict
 
 
# 分发器,嵌套函数
def dispatcher(src):
    queues = []  # 队列列表
    threads = []  # 线程管理
 
    def reg(handler, width, interval):
        = Queue()    # 分配队列
        queues.append(q)  # 方便调用
 
        = threading.Thread(target=window,args=(q, handler, width, interval))
        threads.append(t)
 
 
 
    def run():
        for in threads:
            t.start()
 
        for in src:
            for in queues:
                q.put(x)
 
    return reg,run
 
     
reg,run = dispatcher(load('test.log'))
 
# reg注册 窗口
# reg(donothing_handler, 10, 5)    #注册测试
# reg(status_handler, 10, 5)       # 注册状态码处理函数
reg(browser_handler, 6060)       # 注册useragent处理函数,注意时间窗口宽度
 
run()