<pre name="code" class="html">grok: 解析任意文本并构造它: Grok 是当前最好的方式在logstash 解析蹩脚的非结构化日志数据 到一些结构化的可查询的。 这个工具是完美的对于syslog logs, apache和其他webserver logs,mysqllogs,在一般情况下,任何日志格式 通常对于人是友好的而不是对于电脑 Logstash 有120种模式默认,你可以找到它们在:https://github.com/logstash-plugins/logstash-patterns- core/tree/master/patterns. Grok Basics: Grok 通过结合文本模式来匹配你的日志 语法对于一个grok 是 %{SYNTAX:SEMANTIC} 语法是 模式的名字会匹配你的文本,比如,3.44 会通过NUMBER 模式匹配和55.3.244.1 通过IP模式匹配。 语法是你如何匹配: SEMANTIC (语义)是标识 你给到一块文本被匹配。 比如,3.44 可能是一个一个事件的持续事件,因此你可以简单的调用它。 此外, 一个字符串 55.3.244.1 可能识别客户端发出的请求。 在上述例子中,你的grok filter 可以看起来像这样: %{NUMBER:duration} %{IP:client} 你可以添加一个数据类型转换成你的grok 模式。默认的 所有的语义都保存作为字符串. 如果你希望 转换一个语义的数据类型,比如改变一个字符串为一个整型 然后将其后缀为目标数据类型。 比如 %{NUMBER:num:int} 会转换num语义从一个字符串到一个整型,当前只支持转换是int和float 例子: 这个质疑的语法和语义,我们可以把有用的字段从一个简单的日志像这个虚构的http 请求日志: 55.3.244.1 GET /index.html 15824 0.043 匹配模式:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} {
"client": [
"55.3.244.1"
],
"method": [
"GET"
],
"request": [
"/index.html"
],
"bytes": [
"15824"
],
"duration": [
"0.043"
]
} 正则表达式: Grok 坐在正则表达式之上,因此很多的正则表达式也是正确的在grok里。 正则表达式库是Oniguruma,你可以看到完整的支持的正则表达式的语言在Oniguruma 网站 自定义 模式: 有时候logstash没有你需要的模式,你有几个选项: 第一,你可以使用Oniguruma 语法用于命名捕获 让你匹配一个文件的片段,保存作为字段 (?<field_name>the pattern here) /******** 55.3.244.1 GET /index.html 15824 0.043 (?<field_name>\S+) 输出:
{
"field_name": [
"55.3.244.1"
]
} (?<field_name>\S+\s+) 输出:多了个空格
{
"field_name": [
"55.3.244.1 "
]
} (?<field_name>\S+\s+\S+) 输出:
{
"field_name": [
"55.3.244.1 GET"
]
} 例如, 后缀日志有一个队列id 是10或者11 个16进制字符,你可以捕获像这样: (?<queue_id>[0-9A-F]{10,11}) d4111111112 表达式:
(?<queue_id>[0-9A-F]{10,11}) 输出:
{
"queue_id": [
"4111111112"
]
} 或者,你也可以创建一个自定义模式的文件: 创建一个目录叫做patterns 里面有个文件叫做extra(文件名不重要,但是名字得对你有意义) 在这个文件中,写pattern 你需要的作为pattern名字,一个空格,然后正则用于哪个模式 例如: 后缀队列id例子: # contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11} Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com> filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
} 上面的会被匹配,结果是下面的字段: timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com> timestamp, logsource, program, 和pid 来自SYSLOGBASE 模式本身定义了一些模式 /*******************
zjtest7-frontend:/usr/local/logstash-2.3.4/config# pwd
/usr/local/logstash-2.3.4/config
zjtest7-frontend:/usr/local/logstash-2.3.4/config# ls -lr patterns/
total 4
-rw-r--r-- 1 root root 32 Aug 30 13:33 postfix zjtest7-frontend:/usr/local/logstash-2.3.4/config/patterns# cat postfix
POSTFIX_QUEUEID [0-9A-F]{10,11} zjtest7-frontend:/usr/local/logstash-2.3.4/config# cat stdin.conf
input {
stdin {
}
} filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
output {
stdout {
codec=>rubydebug{}
}
} zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf
Settings: Default pipeline workers: 1
Pipeline main started
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>
{
"message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"@version" => "1",
"@timestamp" => "2016-08-30T05:34:11.849Z",
"host" => "0.0.0.0",
"timestamp" => "Jan 1 06:25:43",
"logsource" => "mailserver14",
"program" => "postfix/cleanup",
"pid" => "21403",
"queue_id" => "BEF25A72965",
"syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>"
} 简介: 插件支持下面的配置选项: 需要的配置选项: grok { } 细节: add_field 1.值类型是hash 2. 默认值是{} 如果 filter 是成功的,增加任何属性字段到这个事件,Field名字可以动态的和包含event部分使用%{field}. filter {
grok {
add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
} 输出;
zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf
Settings: Default pipeline workers: 1
Pipeline main started
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>
{
"message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"@version" => "1",
"@timestamp" => "2016-08-30T05:44:35.071Z",
"host" => "0.0.0.0",
"timestamp" => "Jan 1 06:25:43",
"logsource" => "mailserver14",
"program" => "postfix/cleanup",
"pid" => "21403",
"queue_id" => "BEF25A72965",
"syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"foo_%{somefield}" => "Hello world, from 0.0.0.0"
} ##你可以一次增加多个字段: filter {
grok {
add_field => { "foo_%{somefield}" => "Hello world, from %{host}"
"new_field" => "new_static_value"
}
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
} 输出; zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf
Settings: Default pipeline workers: 1
Pipeline main started
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>
{
"message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"@version" => "1",
"@timestamp" => "2016-08-30T05:46:37.029Z",
"host" => "0.0.0.0",
"timestamp" => "Jan 1 06:25:43",
"logsource" => "mailserver14",
"program" => "postfix/cleanup",
"pid" => "21403",
"queue_id" => "BEF25A72965",
"syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"foo_%{somefield}" => "Hello world, from 0.0.0.0",
"new_field" => "new_static_value" add_tag 1.值类型是array 2.默认是[] 如果filter 成功,增加任意的tags 到这个事件。Tags 可以动态的包含事件的部分使用%{field} syntax. filter {
grok {
add_tag => [ "foo_%{somefield}" ]
}
}
# You can also add multiple tags at once:
filter {
grok {
add_tag => [ "foo_%{somefield}", "taggedy_tag"]
}
} zjtest7-frontend:/usr/local/logstash-2.3.4/config# ../bin/logstash -f stdin.conf
Settings: Default pipeline workers: 1
Pipeline main started
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>
{
"message" => "Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message- id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"@version" => "1",
"@timestamp" => "2016-08-30T05:50:18.451Z",
"host" => "0.0.0.0",
"timestamp" => "Jan 1 06:25:43",
"logsource" => "mailserver14",
"program" => "postfix/cleanup",
"pid" => "21403",
"queue_id" => "BEF25A72965",
"syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>",
"foo_%{somefield}" => "Hello world, from 0.0.0.0",
"new_field" => "new_static_value",
"tags" => [
[0] "foo_%{somefield}"
]
} break_on_match 1.值类型是波尔型 2.默认值是true Break 在第一个匹配,第一次成功匹配通过grok 会导致filter 被完成。如果你需要grok 尝试所有的patterns( 可能解析不同的东西),设置这个为false match: 1.值类型是hash 2.默认是{} filter {
grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}