详解用ELK来分析Nginx服务器日志的方法

时间:2022-03-19 13:41:45

所有ELK的安装包都可以去官网下载,虽然速度稍慢,但还可以接受,官网地址:https://www.elastic.co/

logstash

在Logstash1.5.1版本,pattern的目录已经发生改变,存储在/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-0.1.10/目录下,但是好在配置引用的时候是可以对patterns的目录进行配置的,所以本人在Logstash的根目录下新建了一个patterns目录。而配置目录在1.5.1版本中也不存在了,如果是rpm包安装的,可以在/etc/logstash/conf.d/下面进行配置,但个人测试多次,这样启动经常性的失败,目前还没有去分析原因(个人不推荐使用RPM包安装)。所以大家可以采用Nohup或者screen的方式进行启动
专属nginx的pattern配置:

 

复制代码 代码如下:


NGINXACCESS %{IP:client} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{HOST:domain} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:useragent} "(%{IP:x_forwarder_for}|-)"

 

 

 

由于是测试环境,我这里使用logstash读取nginx日志文件的方式来获取nginx的日志,并且仅读取了nginx的access log,对于error log没有关心。

使用的logstash版本为2.2.0,在log stash程序目录下创建conf文件夹,用于存放解析日志的配置文件,并在其中创建文件test.conf,文件内容如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
input {
  file {
    path => ["/var/log/nginx/access.log"]
  }
}
filter {
  grok {
    match => {
      "message" => "%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?<http_referer>\S+)\" \"(?<http_user_agent>\S+)\" \"(?<http_x_forwarded_for>\S+)\""
    }
  
}
output {
  elasticsearch {
    hosts => ["10.103.17.4:9200"]
    index => "logstash-nginx-test-%{+YYYY.MM.dd}"
    workers => 1
    flush_size => 1
    idle_flush_time => 1
    template_overwrite => true
  }
  stdout{codec => rubydebug}
}

需要说明的是,filter字段中的grok部分,由于nginx的日志是格式化的,logstash解析日志的思路为通过正则表达式来匹配日志,并将字段保存到相应的变量中。logstash中使用grok插件来解析日志,grok中message部分为对应的grok语法,并不完全等价于正则表达式的语法,在其中增加了变量信息。

具体grok语法不作过多介绍,可以通过logstash的官方文档中来了解。但grok语法中的变量类型如IPORHOST并未找到具体的文档,只能通过在logstash的安装目录下通过grep -nr "IPORHOST" .来搜索具体的含义。

配置文件中的stdout部分用于打印grok解析结果的信息,在调试阶段一定要打开。

可以通过这里来验证grok表达式的语法是否正确,编写grok表达式的时候可以在这里编写和测试。

对于elasticsearch部分不做过多介绍,网上容易找到资料。


elk收集分析nginx access日志

使用redis的push和pop做队列,然后有个logstash_indexer来从队列中pop数据分析插入elasticsearch。这样做的好处是可扩展,logstash_agent只需要收集log进入队列即可,比较可能会有瓶颈的log分析使用logstash_indexer来做,而这个logstash_indexer又是可以水平扩展的,我可以在单独的机器上跑多个indexer来进行日志分析存储。

好了,现在进一步配置了。

nginx中的日志存储格式

nginx由于有get请求,也有post请求,get请求的参数是会直接显示在日志的url中的,但是post请求的参数呢,却不会在access日志中体现出来。那么我想要post的参数也进行存储纪录下来。就需要自己定义一个log格式了。

 

复制代码 代码如下:

 

log_format logstash '$http_host $server_addr $remote_addr [$time_local] "$request" $request_body $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time';

 


这里的requestbody里面存放的就是POST请求的body了,然后GET请求的参数在requestbody里面存放的就是POST请求的body了,然后GET请求的参数在request里面。具体怎么分析,我们在indexer中再想。

 

这里的server_addr存放的是当前web机器的IP,存这个IP是为了分析日志的时候可以分析日志的原始来源。

下面是一个GET请求的例子:

 

复制代码 代码如下:

 

 

api.yejianfeng.com 10.171.xx.xx 100.97.xx.xx [10/Jun/2015:10:53:24 +0800] "GET /api1.2/qa/getquestionlist/?limit=10&source=ios&token=12343425324&type=1&uid=304116&ver=1.2.379 HTTP/1.0" - 200 2950 "-" "TheMaster/1.2.379 (iPhone; iOS 8.3; Scale/2.00)" 0.656 0.654

 


下面是一个POST请求的例子:

 

 

复制代码 代码如下:

 

 

api.yejianfeng.com 10.171.xx.xx 100.97.xx.xx [10/Jun/2015:10:53:24 +0800] "POST /api1.2/user/mechanicupdate/ HTTP/1.0" start_time=1276099200&lng=110.985723&source=android&uid=328910&lat=35.039471&city=140800 200 754 "-" "-" 0.161 0.159

 


顺便说下,这里知识在nginx.conf中定义了一个日志格式,还要记得在具体的服务中加入日志存储。比如

 

?
1
2
3
listen    80;
server_name api.yejianfeng.com;
access_log /mnt/logs/api.yejianfeng.com.logstash.log logstash;

log_agent的配置

这个配置就是往redis队列中塞入日志就行。output的位置设置为redis就行。

input {
        file {
                type => "nginx_access"
                path => ["/mnt/logs/api.yejianfeng.com.logstash.log"]
        }
}
output {
        redis {
                host => "10.173.xx.xx"
                port => 8001
                password => pass
                data_type => "list"
                key => "logstash:redis"
        }
}
log_indexer的配置

log_indexer的配置就比较麻烦了,需要配置的有三个部分

input: 负责从redis中获取日志数据
filter: 负责对日志数据进行分析和结构化
output: 负责将结构化的数据存储进入elasticsearch
input部分

?
1
2
3
4
5
6
7
8
9
input {
    redis {
        host => "10.173.xx.xx"
        port => 8001
        password => pass
        data_type => "list"
        key => "logstash:redis"
    }
}

其中的redis配置当然要和agent的一致了。

filter部分
解析文本可以使用grokgrok debug进行分析,参照着之前的log格式,需要一个个进行日志分析比对。这个grok语法写的还是比较复杂的,还好有在线grok比对工具可以使用。比对前面的GET和POST的日志格式,修改出来的grok语句如下:

 

复制代码 代码如下:

 

%{IPORHOST:http_host} %{IPORHOST:server_ip} %{IPORHOST:client_ip} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_verb} (?:%{PATH:baseurl}\?%{NOTSPACE:params}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" (%{NOTSPACE:params})?|- %{NUMBER:http_status_code} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{QS:agent} %{NUMBER:time_duration:float} %{NUMBER:time_backend_response:float}

 


这里使用了一点小技巧,params的使用,为了让GET和POST的参数都反映在一个参数上,在对应的GET和POST的参数的地方,都设计使用params这个参数进行对应。

 

好了,现在params中是请求的参数。比如source=ios&uid=123。但是呢,最后做统计的时候,我希望得出的是“所有source值为ios的调用”,那么就需要对参数进行结构化了。而且我们还希望如果接口中新加入了一个参数,不用修改logstash_indexer就可以直接使用,方法就是使用kv,kv能实现对一个字符串的结构进行k=v格式的拆分。其中的参数prefix可以为这个key在统计的时候增加一个前缀,include_keys可以设置有哪些key包含在其中,exclude_keys可以设置要排除哪些key。

?
1
2
3
4
5
kv {
  prefix => "params."
  field_split => "&"
  source => "params"
}

好了,现在还有一个问题,如果请求中有中文,那么日志中的中文是被urlencode之后存储的。我们具体分析的时候,比如有个接口是/api/search?keyword=我们,需要统计的是keyword被查询的热门顺序,那么就需要解码了。logstash牛逼的也有urldecode命令,urldecode可以设置对某个字段,也可以设置对所有字段进行解码。

?
1
2
3
urldecode {
  all_fields => true
}

看起来没事了,但是实际上在运行的时候,你会发现一个问题,就是存储到elasticsearch中的timestamp和请求日志中的请求时间不一样。原因是es中的请求日志使用的是日志结构存放进入es的时间,而不是timestamp的时间,这里想要吧es中的时间和请求日志中的时间统一怎么办呢?使用date命令。具体设置如下:

?
1
2
3
4
date {
    locale => "en"
    match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z"]
}

具体的logstash_indexer中的全部配置如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
filter {
    grok {
      match => [
            "message", "%{IPORHOST:http_host} %{IPORHOST:server_ip} %{IPORHOST:client_ip} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_verb} (?:%{PATH:baseurl}\?%{NOTSPACE:params}(?: HTTP/%{NUMBER:http_version})?|%{DATA:raw_http_request})\" (%{NOTSPACE:params})?|- %{NUMBER:http_status_code} (?:%{NUMBER:bytes_read}|-) %{QS:referrer} %{QS:agent} %{NUMBER:time_duration:float} %{NUMBER:time_backend_response:float}"
      ]
    }
    kv {
      prefix => "params."
      field_split => "&"
      source => "params"
    }
    urldecode {
         all_fields => true
    }
    date {
        locale => "en"
        match => ["timestamp" , "dd/MMM/YYYY:HH:mm:ss Z"]
    }
 
}

output部分
这里就是很简单往es中发送数据

?
1
2
3
4
5
6
7
8
9
10
output {
    elasticsearch {
        embedded => false
        protocol => "http"
        host => "localhost"
        port => "9200"
        user => "yejianfeng"
        password => "yejianfeng"
    }
}

这里有个user和password,其实elasticsearch加上shield就可以强制使用用户名密码登录了。这里的output就是配置这个使用的。

查询elasticsearch

比如上面的例子,我要查询某段时间的params.source(其实是source参数,但是前面的params是前缀)调用情况

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$url = 'http://xx.xx.xx.xx:9200/logstash-*/_search';
$filter = '
{
 "query": {
    "range" : {
      "@timestamp" : {
        "gt" : 123213213213,
        "lt" : 123213213213
      }
    }
  },
  "aggs" : {
    "group_by_source" : {"terms" : {"field" : "params.source"}}
  },
  "size": 0
}';