活动封禁刷票ip
一、问题引入
话说,我们一搞活动,就会招来大量过来刷票的人。活动大概是这样的:答题之后抽奖,奖品有可能是代金券或者是电话费充值。然后刷票会导致活动入口打开超慢,正常想参加活动的人会受到极大的影响,极具影响用户体验。服务器通过nginx日志会看到同一个时间点里面有大量的ip过来访问。
然后领导坐不住了,希望我能通过一些手段去处理这个事。
二、问题探索
一开始跟前老大讨论这个事的时候,他告诉我nginx自身有个请求限流模块:ngx_http_limit_req_module,能针对同一个ip的连接数,限制并发数目。我发现,实际场景用起来并不是特别好用,因为我分析过nginx日志,它是同一个时刻,是大量的ip过来的,最多拖慢它过来的访问速度;但是领导的意思是直接封禁它,加入黑名单呢。
于是没办法啦,只能通过nginx日志去封禁这些ip了。领导说有3个参数是可变的,叫我把脚本写灵活点:
(1)ip访问次数:某个ip访问次数达到多少次
(2)访问地址url:针对某个访问的url
(3)统计时间的频率:某个时间频率(比如30秒,1分钟,x分钟)
三、问题解决
用了两个方法:(1)直接纯代码分析nginx日志过滤 (2)利用elk统计分析
大致讲下思路,然后贴代码,早点洗洗睡(今天头胀得厉害,太不舒服了,感觉是积劳成疾 = =)
方法一:直接纯代码分析nginx日志过滤
统计时间的频率我弄死在1分钟里面了,所以不存在第3个参数。
1、筛选1分钟内的nginx日志
2、对访问的某个url统计ip访问次数(记得去重)
3、把黑名单写入nginx配置文件,封禁
4、输出ip黑名单,发到钉钉群
特别提醒下,要是对nginx日志grep一个日志里面不存在的时间点,是返回不了数据的。比如运行命令:
cat access.log |sed -n \'/2020-08-12T10:15:00+08:00/, /2020-08-12T10:25:00+08:00/p\' , 如果这两个时间点刚好是没有日志的,会返回空。即使这个时间段里面有日志。
1 #!/bin/bash 2 3 ### 封禁每分钟超过30次的ip 4 ## authored by ljy 5 ## nginx日志文件目录 6 log_dir=/home/ljy/app/nginx/logs 7 log_file=${log_dir}/活动日志.access.log 8 9 10 ## 刷票ip处理目录 11 ip_dir=/home/ljy/scripts/block_ip 12 ip_date=${ip_dir}/ip_date.txt 13 ip_list=${ip_dir}/ip_list.txt 14 deny_list=${ip_dir}/deny_ip.txt 15 16 ## 白名单文件(不加入到nginx黑名单的) 17 white_list=${ip_dir}/white_ip.txt 18 19 ################ 参数1: ip访问的次数 20 times=30 21 22 ################ 参数2: 访问的地址 23 url=\'/xxx/redirect.html\' 24 25 26 ################ 参数3: 统计频率(人为定义秒数为60秒) 27 SECOND_TO_GREP=60 28 29 30 #日志文件打开行数(tail打开最近1w条) 31 MAX_TAIL_LINE=10000 32 #这段是前老大教的,大家要好好体会。就是把60秒内的日志都筛选出来 33 hms=`date -d "1 second ago" +"%Y-%m-%dT%H:%M:%S"` 34 35 for ((i=1;i<=${SECOND_TO_GREP};i++)) ; 36 do 37 hms="${hms}|`date -d "${i} second ago" +"%Y-%m-%dT%H:%M:%S"`" 38 done 39 40 echo $hms 41 echo "##################" 42 43 44 ######## 1、筛选1分钟内的日志(SECOND_TO_GREP秒前的日志) 45 tail -${MAX_TAIL_LINE} ${log_file} | sed -rn "/${hms}/,$ p" > ${ip_date} 46 47 ##nginx路径 48 nginx_dir=/home/ljy/app/nginx 49 nginxconf_dir=${nginx_dir}/conf 50 51 52 ######## 2、对访问的某个url统计ip访问次数(已去重) 53 ### 列表: 次数+ip 54 >$deny_list 55 56 #echo $url 57 cat $ip_date | grep $url | cut -d \'"\' -f 12 | sort -nr |uniq -c |sort -nr | awk \'{print $0}\' > $ip_list 58 59 cat $ip_list | while read line 60 do 61 iptimes=`echo $line | awk \'{print $1}\'` 62 if [ $iptimes -ge $times ]; then 63 ip=`echo $line | awk \'{print $2}\'` 64 echo "ip 为 $ip " 65 ##白名单不加入禁止ip上 66 if cat $white_list | grep "$ip" > /dev/null 67 then 68 echo "是白名单ip,不禁止" 69 else 70 #如果没写入过nginx配置文件中 71 if cat $nginxconf_dir/extra/配置文件.conf | grep "$ip" > /dev/null 72 # if cat /home/ljy/scripts/block_ip/配置文件.conf | grep "$ip" > /dev/null 73 then 74 echo "已存在该ip,不写入文件" 75 else 76 echo "deny $ip;" >> $deny_list 77 fi 78 fi 79 fi 80 done 81 82 cd $ip_dir 83 ######## 3、把黑名单写入nginx配置文件,封禁 84 sed -i \'40 r \'${deny_list}\'\' $nginxconf_dir/extra/配置文件.conf 85 86 87 ######## 4、nginx重新加载,黑名单封禁生效 88 ${nginx_dir}/sbin/nginx -t 89 ${nginx_dir}/sbin/nginx -s reload 90 91 92 ######## 5、输出ip黑名单,发到钉钉群 93 denytailnum=`cat $nginxconf_dir/extra/配置文件.conf |grep -n deny |tail -n 1 | awk -F \':\' \'{print $1}\'` 94 denyheadnum=`cat $nginxconf_dir/extra/配置文件.conf |grep -n deny |head -n 1 | awk -F \':\' \'{print $1}\'` 95 96 97 ### 如果nginx文件有deny ip 相同行,就不要再重复添加 98 if [ $denytailnum ]; then 99 cat $nginxconf_dir/extra/配置文件.conf | sed -n ${denyheadnum},${denytailnum}p > tmpfile 100 sed \'s/;//g\' tmpfile | awk \'{print $2}\' > ${ip_dir}/warnip.txt 101 fi 102 103 104 #发黑名单ip,每小时的05分发一次 105 min=`date +%M` 106 if [ $min == \'5\' ]; then 107 python warn.py 108 else 109 echo "不是每小时的05分,不发告警" 110 fi
上面之所以用到一个python脚本去读ip黑名单文件,是因为curl钉钉告警的一条命令不支持直接读取文件内容。脚本思路从nginx配置文件拿到deny 首尾那一段的行数(分别是:denytailnum 和 denyheadnum),然后把这些ip写入到文件,python告警读这个文件:warnip.txt
warn.py是用来发钉钉告警的,就是给领导看,哪些ip被加入黑名单了
1 # -*- coding: utf-8 -*- 2 # @Time : 2020/9/18 10:26 3 # @Author : ljy 4 # @File : warn.py 5 6 import time 7 import os 8 import urllib2 9 import json 10 import pymysql 11 import datetime 12 import sys 13 import requests 14 reload(sys) 15 sys.setdefaultencoding(\'utf-8\') 16 17 size = os.path.getsize(\'/home/ljy/scripts/block_ip/warnip.txt\') 18 #1、文件非空输出deny的ip 19 if size != 0: 20 content="以下ip被加入黑名单: \n" 21 with open("/home/ljy/scripts/block_ip/warnip.txt") as f: 22 content += f.read() 23 f.close() 24 ##机器人url 25 url = \'https://oapi.dingtalk.com/robot/send?access_token=xxxxx\' 26 data = {"msgtype": "text", "text": {"content": content}, "at": {"atMobiles": [\'本人电话号码\'], "isAtAll": "false"}} 27 headers = {\'Content-Type\': \'application/json\'} 28 request = urllib2.Request(url=url, headers=headers, data=json.dumps(data)) 29 re = urllib2.urlopen(request, timeout=10) 30 re_data = re.read()
效果图:
后来又叫我写多个解禁ip的脚本,就是传入哪些ip,自动放出来:
read -p "请输入要解禁的ip(逗号分隔): " input_ip echo $input_ip | xargs -d, -n2 -I {} echo {} | sed \'/^$/d\' > out_ip cat out_ip | while read line do ip=$line if cat $nginxconf_dir/extra/acttest.conf | grep "$ip" > /dev/null then echo "存在该ip,解封:$ip" ##找到ip那行,删除 sed -i \'/\'${ip}\'/\'d $nginxconf_dir/extra/配置文件.conf else echo "不存在该ip,不需要解封" fi done
方法二:利用elk统计分析
首先要用filebeat+logstash+elasticsearch+kibana,把nginx日志导到kibana展示,然后运行这条查询语句就可以了
timestamp:指定某个查询时间范围内
min_doc_count:指定ip次数为2次(field 指定了查询的字段为ip,记得预先设定下nginx日志格式为json输出)
GET /logstash-nginx-xxxx/_search { "size":0, "aggs":{ "count_second":{ "date_histogram": { "field": "@timestamp", "interval": "second", #时间范围按秒统计 "time_zone":"+08:00" }, "aggs":{ "ipv":{ "terms": { "field": "clientip.keyword", "size":1000, "order": { "_count": "desc" }, "min_doc_count": 2 } } } } }, "query":{ "bool": { "must": [ { "match_phrase": { "url": "匹配某个url地址"}} ], "filter": { "range": { "@timestamp": { "time_zone":"+08:00", "gte": "2020-08-10T12:00:00.000", "lte": "2020-08-10T12:10:00.000" } } } } } }