需求场景
当使用elasticsearch进行日志数据可视化的时候,往往会遇到需要IP地址无法human-reading的情况。这时,我们需要将IP地址进行一定的格式转换,将其转换为主机名(hostname)或者系统名(application/service name)。
以WAF的日志为例,里面的dst_ip记录了被攻击的主机ip,scr_ip记录了发起攻击的系统的ip:
可视化后,如果以dst_ip进行聚合,我们无法清楚的看到是哪个系统遭受了攻击
因此,我们需要将IP到系统名进行一个映射。实际的生产工作中,我们一定还会在其他的地方遇到类似的需求。
解决方案
如果只有一两个值的情况,我们可以用script field和plainless的方式来解决。但如果我们有大量的记录,每次都通过脚本的方式进行转换会为集群带来大量的负担,并且会需要更长的处理时间,因此,我们需要在数据写入的时候就进行IP地址的转换。
logstash上提供了一个dns_filter_plugin,可以进行dns的查找。其中有几个重要的参数:
-
resolve
, 是指将A记录(主机名),CNAME记录(服务名)等域名转换为IP地址 -
reverse
,是指反向映射,将IP地址映射为域名 -
hostsfile
, 包含映射关系的hosts文件 -
action
, 需要执行的操作,提供的操作类型是replace和append。replace是现场替换,直接将field里的value进行替换。append是附加,会将field里的value转换为数组,并且附加到数组的末尾。
测试示例
我们可以快速的通过logstash的其他plugin,一起来验证dns是否可以完成我们的需求。
- input: 使用generate plugin
- filter:使用dns plugin
- output: 使用stdout plugin
新建一个conf文件:
input{
generator {
message => "192.168.135.105"
count => 1
}
}
filter {
dns {
action => "replace"
reverse => [ "message" ] ## 我们使用reverse,是因为我们需要根据IP,知道系统
hostsfile => ["/Users/Lex/Applications/myhosts"]
}
}
output{
stdout{
codec => json
}
}
先来一个简单的myhosts:
192.168.135.105 前置应用2
注意,这里的域名是中文的,因此这并非一个看起来合法的hosts文件,但是无所谓,我们要做的只是一个mapping。
运行一下logstash,我们来看看输出:
[2019-04-04T08:42:41,044][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
{"host":"MacBook-Pro.local","@timestamp":"2019-04-04T00:42:40.991Z","@version":"1","sequence":0,"message":"前置应用2"}[2019-04-04T08:42:41,488][INFO ][logstash.pipeline ] Pipeline terminated {"pipeline.id"=>"main"}
message
的值已经被转换为了前置应用2
。
我们添加更多的值来试试,将input改为下表:
input{
generator {
#message => "192.168.135.105"
lines => [
"192.168.200.22",
"192.168.200.23",
"192.168.200.24"
]
count => 1
}
}
对应的hosts文件改为:
192.168.135.105 前置应用2
192.168.200.22 客服系统
192.168.200.23 客服系统
192.168.200.24 客服系统
192.168.200.25 客服系统
测试一下:
{"@timestamp":"2019-04-04T00:59:12.424Z","sequence":0,"message":"客服系统","@version":"1","host":"MacBook-Pro.local"}{"@timestamp":"2019-04-04T00:59:12.453Z","sequence":0,"message":"客服系统","@version":"1","host":"MacBook-Pro.local"}{"@timestamp":"2019-04-04T00:59:12.455Z","sequence":0,"message":"客服系统","@version":"1","host":"MacBook-Pro.local"}[2019-04-04T08:59:12,904][INFO ][logstash.pipeline ] Pipeline terminated {"pipeline.id"=>"main"}
没问题。
我们把原始的ip地址也保留一下:
filter {
mutate {
add_field => { "service" => "%{message}" }
}
dns {
action => "replace"
reverse => [ "service" ] ## 我们使用reverse,是因为我们需要根据IP,知道系统
hostsfile => ["/Users/Lex/Applications/myhosts"]
}
}
这时输出为:
[2019-04-04T09:04:32,714][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
{"@version":"1","message":"192.168.200.24","sequence":0,"@timestamp":"2019-04-04T01:04:32.711Z","service":"客服系统","host":"MacBook-Pro.local"}{"@version":"1","message":"192.168.200.23","sequence":0,"@timestamp":"2019-04-04T01:04:32.710Z","service":"客服系统","host":"MacBook-Pro.local"}{"@version":"1","message":"192.168.200.22","sequence":0,"@timestamp":"2019-04-04T01:04:32.598Z","service":"客服系统","host":"MacBook-Pro.local"}[2019-04-04T09:04:33,094][INFO ][logstash.pipeline ] Pipeline terminated {"pipeline.id"=>"main"}
我们将系统名保存到了service
中。
性能测试与调优
从代码上看,因为logstash每个worker每次处理一条event,在进入pipeline之后,每个包含映射逻辑的event都需要线程去加载一个文件,并且在这个文件中检索某个IP地址或域名,当我们有大量的吞吐时,可能会影响到实时性,看看文档上是怎么说的:
This filter, like all filters, only processes 1 event at a time, so the use of this plugin can significantly slow down your pipeline’s throughput if you have a high latency network. By way of example, if each DNS lookup takes 2 milliseconds, the maximum throughput you can achieve with a single filter worker is 500 events per second (1000 milliseconds / 2 milliseconds).
(显然,别人文档上是说当you have a high latency network时,才会影响吞吐,因为我们用的是hosts文件,而不是DNS server,索引不会受网络的影响,但总之,我们得测试一下)
关于性能的优化,这里的可选项是缓存,对应的参数是:
-
failed_cache_size
- Value type is number
- Default value is 0
- cache size for failed requests
-
failed_cache_ttl
- Value type is number
- Default value is 5
- how long to cache failed requests (in seconds)
-
hit_cache_size
- Value type is number
- Default value is 0
- set the size of cache for successful requests
-
hit_cache_ttl
- Value type is number
- Default value is 60
- how long to cache successful requests (in seconds)
我们都先用默认值,我们添加100个IP地址,循环发送5000次:
real 0m33.916s
user 2m14.692s
sys 0m7.794s
real 0m34.766s
user 2m15.708s
sys 0m7.777s
real 0m34.563s
user 2m14.599s
sys 0m7.626s
可能是因为我的电脑比较快,24秒处理50万条,当然,这里是多线程。因为没有其他cpu+memory的基线数据,所以只能以自己的mac作为基线了。
然后我修改了hit_cache_size
参数,设置为100(因为我只有100个IP)。发现基本上最终的参数没有什么变化,在循环次数少的(循环100和1000)次的情况下,使用的时间还稍微多了点。
因此,在简单的测试了几分钟后,结论是:
没有遇到明显的性能问题的情况下,无需设置cache相关的参数。因为没有读源码,可能cache是默认打开了的