Python第七课----正则和日志分析

时间:2021-11-27 20:04:05
一、正则表达式:
1、分类:    1、BRE基本正则,grep、sed,vi等软件支持,vim有扩展  2、ERE扩展正则,egrep、grep-E,sed-r等   3、PCRE最重要的,高级语言中的
2、基本语法:
  1、元字符 metacharacter
.
匹配除了换行符外任意一字符,迭代所有
.
[abc]
字符“集合”,只能从中拿出一个,然后一个个匹配一个位置
plain,匹配出a
[^abc]
字符集合,表示一个字符位置,匹配非abc
plain,匹配出plin
[a-z]
小写字母集合,一个
[^a-z]
非小写字母,一个
\b
匹配单词的边界
\B
不匹配单词的边界
\d
[0-9]匹配以为数字
\D
[^0-9]匹配一位非数字
\s
匹配一位空白字符,包括换行符,制表符,空格[\f\r\n\t\v]
\S
匹配一位非空白字符
\w
匹配[a-zA-Z0-9_],包括中文的字,多个匹配
\W
匹配\w之外的字符


2、单行模式:    .可以匹配所有字符,包括换行符  ^表示整个字符的开头,$表示整个字符的结尾
3、多行模式:  .可以匹配除了换行符之外的字符  ^表示行首,行尾$  ^表示整个字符串的开始,$表示整个字符串的结尾.  开始指的的\n后紧接着的下一个字符,结束是指的/n前的字符  注意:注意字符串中看不见的换行符\r\n会影响e$的测试,e$只能匹配e\n
4、举例  very very very happy  harry key
5、转义  特殊含义的符号,想使用本意,使用\,如果是\本身,则使用\\  \r、\n还是转义后代表回车换行
    6、重复

*
表示前面的正则表达式重复0次或多次
e\w*,单词中有e后面非空白字符
+
表示前面的正则表达式重复至少一次
e\w+,单词中e,后面至少一个非空
表示前面的正则表达式0次或1次
e\w?,单词中e,后面0个或1个非空
{n}
重复固定的n次
e\w{5},单词e,后面5个非空
{n,}
重复至少n次
e\w{1,},单词e,后面至少1个非空
{n,m}
重复n到m次
e\w{1,10},单词e后面至少1至多10非空
7、手机号:\d{11}\d{3,4}-\d{7,8}
代码
说明
举例
x|y
匹配x或者y
wood,foodw|food或(w|f)ood
(pattern)
小括号指定一个分组,改变优先级,引入一个分组,1开始
\b(a|b)\w+
\数字
匹配对应的分组
(very) \1,捕获的是very very
(?:pattern)
?:不要分组
(?<name>exp)(?'name'exp)
分组捕获,但是可以通过name来访问Python语法必须是(?P<name>exp)
(?=exp)
断言exp一定在匹配的右边出现,也就是说断言后面一定要出现exp
f(?=oo)f后一定要出现oo
(?<=exp)
断言exp一定出现在匹配的前面出现,一定有个exp前缀
(?<=f)ood,ood前面一定有个f
(?!exp)
断言exp一定不会出现在右侧,也就是后面一定不是exp
foo(?!d)foo后面一定不是d
(?<!exp)
断言exp一定不会出现在左侧,也就是前面一定不是exp
(?<!f)ood,ood前面一定不是f
(?#comment)
注释
9、贪婪与非贪婪    1、默认贪婪模式,尽可能的匹配更长的字符串  2、非贪婪模式,在重复的符号后面,加上一个?问好,就尽量的少匹配了
*?
匹配任意次,但尽可能的少重复
f\w?k-----fk
+?
匹配至少一次,但尽可能的少重复
f\w+?k这个没有
??
匹配0次或1次,但尽可能的少重复
f\w??k  匹配0次或一次的?,这个是fk
{n,}?
匹配至少n次,但尽可能的少重复
f\w{1,}?k   得到fook
{n,m}?
匹配至少n次,至多m次,但尽可能的少重复
f.{1,111}?k,f.{1,17}?k,f.{1,}?k,f.{1,3}?k
very very very very       ---->v.*y,v.*?y
10、引擎选项:
Python
IgnoreCase
匹配时忽略大小写
re.Ire.IGNORECASE
SingleLine
单行模式,.可以匹配所有,包括\n
re.Sre.DOTALL
Multiline
多行模式^行首$行尾
re.Mre.MULTILINE
IgnorePatternWhitespace
忽略表达式中的空白字符,然后要使用空白,需要转义
re.Xre.VERBOSE
11、匹配0-999的数字:    ^([1-9]\d\d?|\d)(?!w+)\r$  ^([1-9]\d\d?|\d)\r$
12、IP地址  (?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)
二、方法re模块
1、编译   re.compile(pattern,flags=0)设定flag--编译模式,返回正则表达式对象regex   pattern是正则表达式字符串('\d+')这种,编译后的结果被保存
2、单次匹配    re.match(pattern,string,flags=0)        regex.match(string[.pos[,endpos]])   这两个返回结果一样,而且都是从头开始,如果不设置pos的话
s = '1234abc'Matchregex = re.compile('\d+')       matcher = re.match('\d+',s)     match只找一次print(matcher)matcher = regex.match(s)        匹配从头开始,开始没有就是Noneprint(matcher)  regex = re.compile('[ab]')matcher = regex.match(s,4)        可以设定已知的pos和endposprint(matcher)Searchmatcher = re.search('[ab]',s)       search是全局搜索,不过只找一次print(matcher)matcher = regex.search(s)           也可以输入pos,减少时间print(matcher)Fullmatch    正则表达式必须能取到全部才可以regex = re.compile('[ab]')matcher = re.fullmatch('[ab]',s)   这样为None,因为全长需要\w+print(matcher)matcher = regex.fullmatch(s)print(matcher)
3、全部匹配  re.findall(pattern,string,flags=0)       找到全部  regex.findall(string[,pos[,endpos]])
s = 'bottle\nbag\nbig\nable'regex = re.compile('b')result = regex.findall(s)   找到四个print(result)result = regex.finditer(s)  是个迭代器  next()print(result)    s = 'bottle\nbag\nbig\nable'regex = re.compile('^b\w+',re.M)result = regex.findall(s)print(result)
4、匹配替换    re.sub(pattern,replacement,string,count=0,flags=0)   regex.sub(replacement,string,count=0)  使用pattern对string匹配,对匹配项repl   re.sub(pattern,replacement,string,count=0,flags=0)   regex.sub(replacement,string,count=0)
    s = 'bottle\nbag\nbig\nable'  regex = re.compile('b\wg')  result = regex.sub('magedu',s)  print(result)      # 全部替换  result = regex.subn('magedu',s,1)  print(result)        # 替换1次  s = '''01 bottle  02 bag  03 big1  100 able'''  regex = re.compile('\s+|(?<!\w)\d+')    # big1的问题   # regex = re.compile('\s\d+\s+')  这个需要在第一行加空格  result = regex.split(s)  print(result)
5、分组    1、使用()的pattern捕获的数据放到了组group中  2、使用了分组后,在match对象中,可以看到分组  3、group(N)方式返回对应的分组,1-N是对应的分组,0返回整个匹配的字符串  4、如果是用了name,可以使用group(‘name’)的方式取分组  5、group()返回所有分组  6、使用groupdict()返回所有命名的分组
   s = '''01 bottle  02 bag  03 big12  100 a0ble'''    regex = re.compile('(b)(\w)(g)')  result = regex.finditer(s)  for x in result:  print(x.groups())  print(x.group(1))  print(x.group(2))  print('~~~~~~~~~')
6、匹配邮箱地址:^\w[\w\.-]*@\w[\w\.-]*\.[a-z]{2,3}
7、匹配html标记内的内容(取反的思想)  <a herf="www.magedu.com"; traget='_blank'>马哥<br>教育</a><font>abc</font>  <a[^<>]*>(.+)</a>取马哥教育  <a[^<>]*herf=([^<>]+)>
8、身份证:^\d{17}[xX\d]|^\d{15}$
三、日志分析项目:
import randomimport datetimeimport timefrom queue import Queueimport threadingimport refrom pathlib import Pathfrom user_agents import parse# 数据源ops = {    'datetime':lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),    'status':int,    'length':int,    'request':lambda request:dict(zip(('method','url','protocol'),request.spilt())),    'useragent':lambda useragent:parse(useragent)}pattern = '''(?P<remote>[\d\.]{7,15}) - - \[(?P<datetime>[^\[\]]+)\] \"(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" \(?P<status>\d+) (?P<size>\d+) "[^"]+" "(?P<useragent>[^"]+)"'''regex = re.compile(pattern)def extract(line):    '''返回字段的字典,返回None则表示匹配失败'''    matcher = regex.match(line)    info = None    if matcher:        info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}    return infodef openfile(path:str):    with open(path) as f:        for line in f:            fields = extract(line)            if fields:                yield fields            else:                continuedef load(*paths):    '''装载日志文件'''    for item in paths:        p = Path(item)        if not p.exists():            continue        if p.is_dir():            for file in p.iterdir():                if file.is_file():                    yield from openfile(str(file))        elif p.is_file():            yield from openfile(str(p))# def source():                #   演示代码#     while True:#         yield {'value':random.randint(1,100),'datetime':datetime.datetime.now()}#         time.sleep(1)def window(src:Queue,handler,width:int,interval:int):    # 窗口函数    '''    窗口函数    :param src:数据源,生成器,用来拿数据    :param handler: 数据处理函数    :param width: 时间窗口宽度,秒    :param interval: 处理时间间隔,秒    '''    start = datetime.datetime.strptime('20170101 00:00:00 +0800','%Y%m%d %H:%M:%S %z')  # 设定一个开始时间    current = datetime.datetime.strptime('20170101 01:00:00 +0800','%Y%m%d %H:%M:%S %z') # 设定一个现在的时间    buffer = [] # 窗口中的待计算数据    delta = datetime.timedelta(seconds=width-interval)    while True:        # 从数据源获取数据        data = src.get()         # 从格式化后的日志中获取数据,src是数据源,yield数据        if data: # 存入临时缓冲等待计算            buffer.append(data)            # 收集窗口大小的日志数量            current = data['datetime']     # 修改现在的时间为日志内的时间        if (current - start).total_seconds() >= interval:       # delta和时间间隔相同时,取出窗口数据            ret = handler(buffer)                       #            print("{}".format(ret))            start = current            # 重叠方案            buffer = [x for x in buffer if x['datetime'] > current - delta]# # 随机数测试处理函数# def handler(iterable):#     vals = [x['value'] for x in iterable]#     return sum(vals) / len(vals)# # 测试# def donothing_handler(iterable):#     return iterable# 状态码占比def status_handler(iterable):    # 一批时间窗口内的数据    status = {}    for item in iterable:        key = item['status']        if key not in status.keys():            status[key] = 0        status[key] += 1    total = sum(status.values())    return {k:v/total*100 for k,v in status.items()}# 浏览器分析def browser_handler(iterable):    browsers = {}    for item in iterable:        ua = item['useragent']        key = (ua.browser.family,ua.browser.version_string)        browsers[key] = browsers.get(key,0) + 1        return browsersdef dispatcher(src):    handlers = []    queues = []    def reg(handler,width,interval):        '''        注册窗口处理函数        :param handler:注册的数据处理函数        :param width: 时间窗口宽度        :param interval: 时间间隔        '''        q = Queue()        queues.append(q)        h = threading.Thread(target=window,args=(q,handler,width,interval))        handlers.append(h)    def run():        for t in handlers:             t.start() # 启动线程        for item in src:            for q in queues:                q.put(item)    return reg,runif __name__ == "__main__":    import sys    path = 'D:/test.log'    reg,run = dispatcher(load(path))    reg(status_handler,10,5) # 注册    reg(browser_handler,5,5)    run()