转自 http://blog.csdn.net/wp500/article/details/41040213
原文地址:http://logstash.net/docs/1.4.2/tutorials/getting-started-with-logstash
简介
Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。怎么样听起来挺厉害的吧?
在一个典型的使用场景下(ELK):用Elasticsearch作为后台数据的存储,kibana用来前端的报表展示。Logstash在其过程中担任搬运工的角色,它为数据存储,报表查询和日志解析创建了一个功能强大的管道链。Logstash提供了多种多样的 input,filters,codecs和output组件,让使用者轻松实现强大的功能。好了让我们开始吧
依赖条件:JAVA
- java -version
- java version "1.7.0_45"
- Java(TM) SE Runtime Environment (build 1.7.0_45-b18)
- Java HotSpot(TM) 64-Bit Server VM (build 24.45-b08, mixed mode)
启动和运行Logstash的两条命令示例
- curl -O https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
现在你应该有了一个叫logstash-1.4.2.tar.gz的文件了。 我们把它解压一下
- tar zxvf logstash-1.4.2.tar.gz
- cd logstash-1.4.2
现在我们来运行一下:
- bin/logstash -e 'input { stdin { } } output { stdout {} }'
我们现在可以在命令行下输入一些字符,然后我们将看到logstash的输出内容:
- hello world
- 2013-11-21T01:22:14.405+0000 0.0.0.0 hello world
Ok,还挺有意思的吧... 以上例子我们在运行logstash中,定义了一个叫"stdin"的input还有一个"stdout"的output,无论我们输入什么字符,Logstash都会按照某种格式来返回我们输入的字符。这里注意我们在命令行中使用了-e参数,该参数允许Logstash直接通过命令行接受设置。这点尤其快速的帮助我们反复的测试配置是否正确而不用写配置文件。
- bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
我们再输入一些字符,这次我们输入"goodnight moon":
- goodnight moon
- {
- "message" => "goodnight moon",
- "@timestamp" => "2013-11-20T23:48:05.335Z",
- "@version" => "1",
- "host" => "my-laptop"
- }
以上示例通过重新设置了叫"stdout"的output(添加了"codec"参数),我们就可以改变Logstash的输出表现。类似的我们可以通过在你的配置文件中添加或者修改inputs、outputs、filters,就可以使随意的格式化日志数据成为可能,从而订制更合理的存储格式为查询提供便利。
使用Elasticsearch存储日志
- curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.1.1.tar.gz
- tar zxvf elasticsearch-1.1.1.tar.gz
- cd elasticsearch-1.1.1/
- ./bin/elasticsearch
- bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } }'
随意的输入一些字符,Logstash会像之前一样处理日志(不过这次我们将不会看到任何的输出,因为我们没有设置stdout作为output选项)
- you know, for logs
我们可以使用curl命令发送请求来查看ES是否接收到了数据:
- curl 'http://localhost:9200/_search?pretty'
返回内容如下:
- {
- "took" : 2,
- "timed_out" : false,
- "_shards" : {
- "total" : 5,
- "successful" : 5,
- "failed" : 0
- },
- "hits" : {
- "total" : 1,
- "max_score" : 1.0,
- "hits" : [ {
- "_index" : "logstash-2013.11.21",
- "_type" : "logs",
- "_id" : "2ijaoKqARqGvbMgP3BspJA",
- "_score" : 1.0, "_source" : {"message":"you know, for logs","@timestamp":"2013-11-21T18:45:09.862Z","@version":"1","host":"my-laptop"}
- } ]
- }
- }
恭喜,至此你已经成功利用Elasticsearch和Logstash来收集日志数据了。
Elasticsearch 插件(题外话)
- bin/plugin -install lmenezes/elasticsearch-kopf
接下来访问 http://localhost:9200/_plugin/kopf 来浏览保存在Elasticsearch中的数据,设置及映射!
多重输出
- bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } stdout { } }'
当我们输入了一些词组之后,这些输入的内容回回显到我们的终端,同时还会保存到Elasticsearch!(可以使用curl和kopf插件来验证)。
默认配置 - 按照每日日期建立索引
接下来
事件的生命周期
- file:从文件系统中读取一个文件,很像UNIX命令 "tail -0a"
- syslog:监听514端口,按照RFC3164标准解析日志数据
- redis:从redis服务器读取数据,支持channel(发布订阅)和list模式。redis一般在Logstash消费集群中作为"broker"角色,保存events队列共Logstash消费。
- lumberjack:使用lumberjack协议来接收数据,目前已经改为 logstash-forwarder。
Filters
- grok:解析无规则的文字并转化为有结构的格式。Grok 是目前最好的方式来将无结构的数据转换为有结构可查询的数据。有120多种匹配规则,会有一种满足你的需要。
- mutate:mutate filter 允许改变输入的文档,你可以从命名,删除,移动或者修改字段在处理事件的过程中。
- drop:丢弃一部分events不进行处理,例如:debug events。
- clone:拷贝 event,这个过程中也可以添加或移除字段。
- geoip:添加地理信息(为前台kibana图形化展示使用)
Outputs
- elasticsearch:如果你计划将高效的保存数据,并且能够方便和简单的进行查询...Elasticsearch是一个好的方式。是的,此处有做广告的嫌疑,呵呵。
- file:将event数据保存到文件中。
- graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。http://graphite.wikidot.com/。
- statsd:statsd是一个统计服务,比如技术和时间统计,通过udp通讯,聚合一个或者多个后台服务,如果你已经开始使用statsd,该选项对你应该很有用。
Codecs
- json:使用json格式对数据进行编码/解码
- multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息
获取完整的配置信息,请参考Logstash文档中 "plugin configuration"部分。
更多有趣Logstash内容
使用配置文件
- input { stdin { } }
- output {
- elasticsearch { host => localhost }
- stdout { codec => rubydebug }
- }
接下来,执行命令:
- bin/logstash -f logstash-simple.conf
我们看到logstash按照你刚刚创建的配置文件来运行例子,这样更加的方便。注意,我们使用-f参数来从文件获取而代替之前使用-e参数从命令行中获取配置。以上演示非常简单的例子,当然解析来我们继续写一些复杂一些的例子。
过滤器
filters是一个行处理机制将提供的为格式化的数据整理成你需要的数据,让我们看看下面的一个例子,叫grok filter的过滤器。
- input { stdin { } }
- filter {
- grok {
- match => { "message" => "%{COMBINEDAPACHELOG}" }
- }
- date {
- match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
- }
- }
- output {
- elasticsearch { host => localhost }
- stdout { codec => rubydebug }
- }
执行Logstash按照如下参数:
- bin/logstash -f logstash-filter.conf
- 127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你将看到类似如下内容的反馈信息:
- {
- "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
- "@timestamp" => "2013-12-11T08:01:45.000Z",
- "@version" => "1",
- "host" => "cadenza",
- "clientip" => "127.0.0.1",
- "ident" => "-",
- "auth" => "-",
- "timestamp" => "11/Dec/2013:00:01:45 -0800",
- "verb" => "GET",
- "request" => "/xampp/status.php",
- "httpversion" => "1.1",
- "response" => "200",
- "bytes" => "3891",
- "referrer" => "\"http://cadenza/xampp/navi.php\"",
- "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
- }
正像你看到的那样,Logstash(使用了grok过滤器)能够将一行的日志数据(Apache的"combined log"格式)分割设置为不同的数据字段。这一点对于日后解析和查询我们自己的日志数据非常有用。比如:HTTP的返回状态码,IP地址相关等等,非常的容易。很少有匹配规则没有被grok包含,所以如果你正尝试的解析一些常见的日志格式,或许已经有人为了做了这样的工作。如果查看详细匹配规则,参考logstash grok patterns。
实用的例子
Apache 日志(从文件获取)
- input {
- file {
- path => "/tmp/access_log"
- start_position => beginning
- }
- }
- filter {
- if [path] =~ "access" {
- mutate { replace => { "type" => "apache_access" } }
- grok {
- match => { "message" => "%{COMBINEDAPACHELOG}" }
- }
- }
- date {
- match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
- }
- }
- output {
- elasticsearch {
- host => localhost
- }
- stdout { codec => rubydebug }
- }
接下来,我们按照上面的配置创建一个文件(在例子中是"/tmp/access.log"),可以将下面日志信息作为文件内容(也可以用你自己的webserver产生的日志):
- 71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
- 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
- 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在使用-f参数来执行一下上面的例子:
- bin/logstash -f logstash-apache.conf
- input {
- file {
- path => "/tmp/*_log"
- ...
现在你可以看到logstash处理了error日志和access日志。然而,如果你检查了你的数据(也许用elasticsearch-kopf),你将发现access_log日志被分成不同的字段,但是error_log确没有这样。这是因为我们使用了“grok”filter并仅仅配置匹配combinedapachelog日志格式,这样满足条件的日志就会自动的被分割成不同的字段。我们可以通过控制日志按照它自己的某种格式来解析日志,不是很好的吗?对吧。
条件判断
- input {
- file {
- path => "/tmp/*_log"
- }
- }
- filter {
- if [path] =~ "access" {
- mutate { replace => { type => "apache_access" } }
- grok {
- match => { "message" => "%{COMBINEDAPACHELOG}" }
- }
- date {
- match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
- }
- } else if [path] =~ "error" {
- mutate { replace => { type => "apache_error" } }
- } else {
- mutate { replace => { type => "random_logs" } }
- }
- }
- output {
- elasticsearch { host => localhost }
- stdout { codec => rubydebug }
- }
Syslog
- input {
- tcp {
- port => 5000
- type => syslog
- }
- udp {
- port => 5000
- type => syslog
- }
- }
- filter {
- if [type] == "syslog" {
- grok {
- match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:
)?: %{GREEDYDATA:syslog_message}" }
- add_field => [ "received_at", "%{@timestamp}" ]
- add_field => [ "received_from", "%{host}" ]
- }
- syslog_pri { }
- date {
- match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
- }
- }
- }
- output {
- elasticsearch { host => localhost }
- stdout { codec => rubydebug }
- }
- bin/logstash -f logstash-syslog.conf
- telnet localhost 5000
- Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
- Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
- Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
- Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
之后你可以在你之前运行Logstash的窗口中看到输出结果,信息被处理和解析!
- {
- "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
- "@timestamp" => "2013-12-23T22:30:01.000Z",
- "@version" => "1",
- "type" => "syslog",
- "host" => "0:0:0:0:0:0:0:1:52617",
- "syslog_timestamp" => "Dec 23 14:30:01",
- "syslog_hostname" => "louis",
- "syslog_program" => "CRON",
- "syslog_pid" => "619",
- "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
- "received_at" => "2013-12-23 22:49:22 UTC",
- "received_from" => "0:0:0:0:0:0:0:1:52617",
- "syslog_severity_code" => 5,
- "syslog_facility_code" => 1,
- "syslog_facility" => "user-level",
- "syslog_severity" => "notice"
- }
恭喜各位,看到这里你已经成为一个合格的Logstash用户了。你将可以轻松的配置,运行Logstash,还可以发送event给Logstash,但是这个过程随着使用还会有很多值得深挖的地方。