<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

时间:2021-08-15 19:33:28

http://www.eryajf.net/2362.html

转载于
本文预计阅读时间 28 分钟

现在有好几台主机的nginx日志想要监控分析一下,那么,如何让远程主机的日志都乖乖的来到elk主机之上呢,这是一个需要考虑的问题,而这里,我就使用rsyslog来完成。

这种方式貌似针对于远程主机上只有单项日志的情况,就像我们现在做的,只处理nginx的访问日志一般的,如果还有更多的日志需要从远程转发到elk集群中,就需要用其他方式了,或者说我只会配置这么一种,更复杂的,目前还玩不转。后来哈,在当我测试rsyslog转发多个日志的时候,发现,里边配置规则过于复杂,分类方面也非常不给力,就此打住,一旦多个日志,直接用filebeat即可。

现在,废话不多说,直接进入正题。

目前的思路,可以通过下图了解:

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

说明:

  • 图中举例了两台nginx,其实可以有更多台nginx日志可以想后方转发。
  • 双方以rsyslog作为桥梁,从而实现日志的中转运输。其中nginx主机上是发送端,elk主机上是接收端。
  • 针对不同的nginx日志,可以启动不同的logstash实例,然后将日志转发给es。
  • 由es,统一将日志转给kibana。

基本上配置起来也就是按上边的思路走,一步步配置起来就好了。

主机情况:

主机 组件
192.168.100.10 Nginx,Rsyslog
192.168.10.10 Rsyslog,ELK

1,nginx日志json化。

首先来配置nginx端的日志,为了方便后边一系列的操作,需要先把nginx的日志json化,要配置也非常简单,通过编辑nginx的配置文件来实现:

在nginx的配置文件nginx.conf的http区域添加如下内容:

  1. log_format json '{"@timestamp":"$time_iso8601",'
  2. '"host":"$server_addr",'
  3. '"request_method": "$request_method", '
  4. '"clientip":"$remote_addr",'
  5. '"size":$body_bytes_sent,'
  6. '"responsetime":$request_time,'
  7. '"upstreamtime":"$upstream_response_time",'
  8. '"upstreamhost":"$upstream_addr",'
  9. '"http_host":"$host",'
  10. '"url":"$uri",'
  11. '"xff":"$http_x_forwarded_for",'
  12. '"referer":"$http_referer",'
  13. '"agent":"$http_user_agent",'
  14. '"status":"$status"}';

注意,这个地方的配置,其实是非常重要的,里边定义了许多的字段(可能还不明白字段是什么意思,等到后边会慢慢介绍这个东东),这些字段取自于原nginx日志,然后再转给es,最后通过kibana展示出来,从而让分析更加多维,立体,简单说明一下这些参数:

  • timestamp:日志的时间戳。
  • host:nginx所在主机IP。
  • request_method:请求类型(POST,GET)
  • clientip:前端ip。
  • size:请求大小。
  • responsetime:请求响应时间。
  • upstreamtime:后端相应请求时间。
  • upstreamhost:后端服务器IP。
  • http_host:域名。
  • url:请求的接口。
  • xff:用户IP。
  • referer:完整请求地址。
  • agent:客户端类型。
  • status:状态码。

其中的clientip常规情况下就是指请求的来源IP地址,下边的xff也是,原因是我们的环境当中在nginx节点之前,还有一层waf,所以此时代表请求的来源IP地址的,则是xff,这个字段用意,将会在后边说明。

然后在nginx的访问日志配置access_log处更改配置如下:

  1. access_log /var/log/nginx/access.log json;

此处的json是与上边的log_format后边的名称相对应的,保持一致即可。如果有多台,可以将如上配置配置过去,这里我就不一一列举了,就拿这台nginx作为示例了。下边也一样,不再配置多台。

2,发送端配置。

现在开始编辑rsyslog发送端的配置,直接编辑rsyslog的配置文件,这里以其中一台服务器的nginx日志作为示例,通过rsyslog将nginx的访问日志转发到elk主机上去。

说明有二:

  • 默认我这里使用的是CentOS7的主机,已经安装了rsyslog-8.x的服务。
  • 还有主机是CentOS-6.x的,可能rsyslog版本很低,这个解决非常简单,通过如下两条命令安装即可。
  1. wget http://rpms.adiscon.com/v8-stable/rsyslog.repo -O /etc/yum.repos.d/rsyslog.repo
  2. yum -y install rsyslog
  3. rsyslogd -v

接下来列出配置内容:

  1. [root@localhost curl]$egrep -v "^#|^$" /etc/rsyslog.conf
  2. $ModLoad imfile
  3. $InputFilePollInterval 3
  4. $WorkDirectory /var/spool/rsyslog
  5. $PrivDropToGroup adm
  6. $InputFileName /var/log/nginx/access.log
  7. $InputFileTag nginx-access:
  8. $InputFileStateFile stat-nginx-access
  9. $InputFileSeverity info
  10. $InputFilePersistStateInterval 25000
  11. $InputRunFileMonitor
  12. $template BiglogFormatNginx,"%msg%\n"
  13. if $programname == 'nginx-access' then @192.168.10.10:514;BiglogFormatNginx
  • InputFileName:定义日志文件位置,此处貌似不能使用*.log来匹配。
  • InputFileTag:定义一个tag,方便下边调用。
  • template:定义日志格式的模板。
  • 最后一句:将日志转发给192.168.10.10UDP(一个@表示UDP,两个@表示TCP)端口514,并应用模板BiglogFormatNginx

3,接收端配置。

接收端在elk主机之上,这里同样配置rsyslog作为接收端,然后将接收到的日志存在elk本地。配置内容如下:

  1. [root@localhost ~]$egrep -v "^#|^$" /etc/rsyslog.conf
  2. $template myformat,"%rawmsg%\n"
  3. $ActionFileDefaultTemplate myformat
  4. $ModLoad imuxsock # provides support for local system logging (e.g. via logger command)
  5. $ModLoad imjournal # provides access to the systemd journal
  6. $ModLoad immark # provides --MARK-- message capability
  7. $ModLoad imudp
  8. $UDPServerRun 514
  9. $ModLoad imtcp
  10. $InputTCPServerRun 514
  11. $WorkDirectory /var/lib/rsyslog
  12. $AllowedSender tcp, 192.168.10.0/24,192.168.100.0/24
  13. $template Remote,"/logs/logstashfile/%fromhost-ip%/%fromhost-ip%_%$YEAR%-%$MONTH%-%$DAY%.log"
  14. :fromhost-ip, !isequal, "127.0.0.1" ?Remote
  15. $IncludeConfig /etc/rsyslog.d/*.conf
  16. $OmitLocalLogging on
  17. $IMJournalStateFile imjournal.state

说明:

  • 1,在本机开启upd,tcp端口514接收日志。
  • 2,接收的范围,是IP定义的,多个网段的话中间用逗号隔开。
  • 3,接收到的日志存放模板,存在本地的/logs目录下,这里注意需要创建对应目录。
  • 4,不杰接受来自elk本机的日志。

然后在两边都启动rsyslog服务。

  1. systemctl restart rsyslog
  2. systemctl enable rsyslog
  3. systemctl status rsyslog

等待一会儿,就能够在elk主机上看到日志目录生成了。

  1. [root@elk logstashfile]$pwd
  2. /logs/logstashfile
  3. [root@elk logstashfile]$ls
  4. 192.168.100.10

然后进去看一下日志,发现就是远程主机当中已经json化的nginx访问日志。

4,配置logstash。

接着就该考虑如何将远程转过来的nginx日志,呈现在elk集群当中,这里使用logstash进行日志的转发,配置如下:

  1. cat > /etc/logstash/conf.d/nginx.conf << EOF
  2. input {
  3. file {
  4. type => "nginx-access"
  5. path => "/logs/logstashfile/192.168.100.10/192.168.100.10_*.log"
  6. codec => "json"
  7. }
  8. }
  9. filter {
  10. geoip {
  11. source => "xff"
  12. fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"]
  13. remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
  14. }
  15. json {
  16. source => "message"
  17. target => "jsoncontent"
  18. }
  19. }
  20. output {
  21. if [type] == "nginx-access" {
  22. elasticsearch {
  23. hosts => ["127.0.0.1:9200"]
  24. manage_template => true
  25. index => "nginx-access-%{+YYYY-MM}"
  26. }
  27. }
  28. }
  29. EOF
  • 1,input定义日志的名称(type),位置(path),格式。可以定义多个。这里直接读取通过rsyslog放置到本地的日志。
  • 2,filter定义一些关键字段的过滤。通过geoip来实现ip的地理位置标识,注意这个地方我的source是xff,上边我已经解释过这个原因,如果你的是常规情况,那么应该更改成clientip。
  • 3,output定义标准输出,到本机的9200端口,最后的index是索引名称,可以自定义,如果是不同主机上的日志,可以在这个地方根据其ip不同来进行区分。

注意:此处所配置的geoip不需要像网上许多教程说的,还要再引用对应的数据库,因为目前最新版本的elk已经配备了相应的配置,这点,在等会儿启动的日志当中可以见分晓。

input {
kafka {
type =>"nginxlog"
topics =>["nginxlog"]
bootstrap_servers=> "172.18.45.79:9092,172.18.45.78:9092,172.18.45.80:9092"
group_id =>"nginxlog"
auto_offset_reset=> latest
codec =>"json"
}
kafka {
type => "kafka-logs"
bootstrap_servers => "172.18.45.79:9092,172.18.45.78:9092,172.18.45.80:9092"
group_id => "logstash"
auto_offset_reset => "earliest"
topics => "kafka_run_log"
consumer_threads => 5
decorate_events => true
}
}

filter {
if [type] == "nginxlog"{
grok {
match => {"message" => "%{COMBINEDAPACHELOG}" }
remove_field =>"message"
}
date {
match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
geoip {
source =>"clientip"
target =>"geoip"
database =>"/usr/local/logstash-6.4.0/config/GeoLite2-City_20190423/GeoLite2-City.mmdb"
add_field => ["[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => ["[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => ["[geoip][coordinates]", "float"]
}
useragent {
source =>"agent"
target =>"userAgent"
}
}
if [type] == "kafka-logs"{
date {
match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
}
}
output {
if [type] == 'nginxlog' {
elasticsearch {
hosts =>["172.18.45.79:9200","172.18.45.78:9200","172.18.45.80:9200"]
index =>"logstash-nginxlog-%{+YYYY.MM.dd}"
}

}
if [type] == 'kafka-logs' {
elasticsearch {
hosts =>["172.18.45.79:9200","172.18.45.78:9200","172.18.45.80:9200"]
index =>"logstash-nginxlog-%{+YYYY.MM.dd}"
}

}

}

默认有一个data目录,为了方便以后多实例管理,所以单独创建一个,另外启动日志也可以输出一个,从而让运维更容易,同样也创建对应的目录。

  1. mkdir /usr/share/logstash/data1
  2. mkdir /logs/logstash_nohup

启动实例。

  1. nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx.conf --path.data=/usr/share/logstash/data1 &> /logs/logstash_nohup/nginx.out &

然后监听一下转发状况:

  1. [root@localhost ~]$tail -fn 100 /logs/logstash_nohup/nginx.out
  2. WARNING可以忽略,因为我们启动的时候已经指定配置文件了。: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
  3. Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
  4. [WARN ] 2018-12-18 17:49:40.285 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
  5. [INFO ] 2018-12-18 17:49:40.300 [LogStash::Runner] runner - Starting Logstash {"logstash.version"=>"6.5.3"}
  6. [INFO ] 2018-12-18 17:49:43.330 [Converge PipelineAction::Create<main>] pipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
  7. [INFO ] 2018-12-18 17:49:43.733 [[main]-pipeline-manager] elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
  8. [WARN ] 2018-12-18 17:49:45.024 [[main]-pipeline-manager] elasticsearch - Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
  9. [INFO ] 2018-12-18 17:49:46.660 [[main]-pipeline-manager] elasticsearch - ES Output version determined {:es_version=>6}
  10. [WARN ] 2018-12-18 17:49:46.663 [[main]-pipeline-manager] elasticsearch - Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
  11. [INFO ] 2018-12-18 17:49:46.687 [[main]-pipeline-manager] elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1:9200"]}
  12. [INFO ] 2018-12-18 17:49:46.701 [Ruby-0-Thread-5: :1] elasticsearch - Using mapping template from {:path=>nil}
  13. [INFO ] 2018-12-18 17:49:46.711 [[main]-pipeline-manager] geoip - Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
  14. [INFO ] 2018-12-18 17:49:46.723 [Ruby-0-Thread-5: :1] elasticsearch - Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
  15. [INFO ] 2018-12-18 17:49:46.989 [[main]>worker15] beats - Beats inputs: Starting input listener {:address=>"0.0.0.0:5054"}
  16. [INFO ] 2018-12-18 17:49:47.006 [Converge PipelineAction::Create<main>] pipeline - Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x31362ee1 run>"}
  17. [INFO ] 2018-12-18 17:49:47.058 [Ruby-0-Thread-1: /usr/share/logstash/lib/bootstrap/environment.rb:6] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
  18. [INFO ] 2018-12-18 17:49:47.061 [[main]<beats] Server - Starting server on port: 5054
  19. [INFO ] 2018-12-18 17:49:47.290 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9610}

说明:

从这段日志当中,可以看出不少的信息,所以特别的拎出来了。

  • 1,第一个WARNING可以忽略,因为我们启动的时候已经指定配置文件了。
  • 2,启动之后开始连接配置当中指定的127.0.0.1:9200。
  • 3,接着会看到一个输出:
    1. geoip - Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
      这条输出的意思就是刚刚我提到的那个问题,系统其实已经有了对应的库,进行加载,当然,有可能这个库的新鲜度未必能够保证。需要注意的是,GeoLite数据库由每月的第一个星期二的MaxMind更新,如果需要最新的,可以及时进行更新下载。
  • 4,接着可以看到es加载了初始定义好了的模板进行数据处理。
  • 5,直到一系列初始化完成,然后正式处理对应的日志到。

过一会儿,会看到有大量日志输出,说明日志已经成功转发给es,此时也可以去kibana当中创建一个对应的索引了。

5,简单使用kibana。

现在来访问kibana做一些简单查看:

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

  • 1,点击管理,进入到管理界面。
  • 2,点击索引管理,可以看到es已经创建的索引。
  • 3,点击索引模式,将es的索引引入到kibana当中。

现在我们点击第三步,去创建一下索引:

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

点击进来之后,就能看到刚刚我们定义的索引了,现在我创建一个以nginx-access*命名的索引(这个地方所列出来的索引,都是在logstash的配置当中定义了的索引),点击下一步。

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

选择索引的模式,这里一般都使用timestamp,即以时间作为标准。然后点击创建即可。

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

创建成功之后,能够看到一些我们前边定义好了的字段,这些字段也将成为我们日后分析日志的重要依据,可能你现在对字段是什么,还不能十分理解,不用着急,慢慢往后看,咱们一起边做边理解。

现在可以去看一下,前台日志展示页面是否成功了:

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

可以看到日志已经呈现在眼前了,到这里,可以说,基本上elk的整个流程就算是走通了。

貌似一般情况下,网上教程都是到这个地方就结束了,然而,事情难道不是刚刚开始么。

接着,我们先来看一下我们前边多次提到的字段是否如我们所愿的展示了,查看的方法也很简单,随便点击其中一条日志,查看其json的格式:

<二>ELK-6.5.3学习笔记–使用rsyslog传输管理nginx日志

这样一展示之后,就能看出,清晰多了,我们所想要的,都生成了,稍候就能够通过这些数据,来进行画图等操作