实战ELK(5) Logstash 入门

时间:2021-05-11 07:44:57

实战ELK(5) Logstash 入门

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

一、原理

Logstash 通过管道进行运作,管道有两个必需的元素,输入和输出,还有一个可选的元素,过滤器。

输入插件从数据源获取数据,过滤器插件根据用户指定的数据格式修改数据,输出插件则将数据写入到目的地。如下图:

实战ELK(5) Logstash 入门

输入:采集各种样式、大小和来源的数据

数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

实战ELK(5) Logstash 入门

过滤器:实时解析和转换数据

数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。

Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标
  • 将 PII 数据匿名化,完全排除敏感字段
  • 整体处理不受数据源、格式或架构的影响

实战ELK(5) Logstash 入门

输出:选择你的存储,导出你的数据

尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。

Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

实战ELK(5) Logstash 入门

二、安装

2.1、下载

Logstash 依赖 JDK1.8 ,因此在安装之前请确保机器已经安装和配置好 JDK1.8。

Logstash 下载:https://artifacts.elastic.co/downloads/logstash/logstash-6.5.1.rpm

2.2、安装

yum install logstash-6.5..rpm

查看启动文件来了解下文件目录

vim /etc/systemd/system/logstash.service

实战ELK(5) Logstash 入门

常用的目录:

/etc/default/logstash
/etc/sysconfig/logstash
/usr/share/logstash/bin/logstash
/etc/logstash

2.3、设置执行权限

  • 方法1,修改
logstash.service
vim /etc/systemd/system/logstash.service
User=root #直接以root账户允许
Group=root
  • 方法2
chown -R logstash:logstash /var/lib/logstash
chown -R logstash:logstash /var/log/logstash

2.4 、开机启动 

systemctl start logstash.service
systemctl enable logstash.service
systemctl status logstash.service

2.5、我们先来一个简单的案例:

cd /usr/share/logstash
bin/logstash -e 'input { stdin { } } output { stdout {} }'

可能会有点慢

实战ELK(5) Logstash 入门

这时候输入个Hello World

实战ELK(5) Logstash 入门

在生产环境中,Logstash 的管道要复杂很多,可能需要配置多个输入、过滤器和输出插件。

因此,需要一个配置文件管理输入、过滤器和输出相关的配置。配置文件内容格式如下:

# 输入
input {
...
} # 过滤器
filter {
...
} # 输出
output {
...
}

根据自己的需求在对应的位置配置 输入插件过滤器插件输出插件 和 编码解码插件 即可。

三、插件用法

Logstash 每读取一次数据的行为叫做事件。

在 Logstach 目录中创建一个配置文件,名为 logstash.conf(名字任意)。

cd /usr/share/logstash

3.1 输入插件

输入插件允许一个特定的事件源可以读取到 Logstash 管道中,配置在 input {} 中,且可以设置多个。

修改配置文件:

input {
# 从文件读取日志信息
file {
path => "/var/log/elasticsearch/"
type => "elasticsearch"
start_position => "beginning"
}
} # filter {
#
# } output {
# 标准输出
stdout { codec => rubydebug }
}

其中,messages 为系统日志。

保存文件。键入:

bin/logstash -f logstash.conf

3.2 输出插件

输出插件将事件数据发送到特定的目的地,配置在 output {} 中,且可以设置多个。

修改配置文件:

input {
# 从文件读取日志信息
file {
path => "/var/log/error.log"
type => "error"
start_position => "beginning"
} } # filter {
#
# } output {
# 输出到 elasticsearch
elasticsearch {
hosts => ["192.168.50.70:9200"]
index => "error-%{+YYYY.MM.dd}"
}
}

保存文件。键入:

bin/logstash -f logstash.conf

3.3 编码解码插件

编码解码插件本质是一种流过滤器,配合输入插件或输出插件使用。

从上图中,我们发现一个问题:Java 异常日志被拆分成单行事件记录到 Elasticsearch 中,这不符合开发者或运维人员的查看习惯。因此,我们需要对日志信息进行编码将多行事件转成单行事件记录起来。

我们需要配置 Multiline codec 插件,这个插件可以将多行日志信息合并成一行,作为一个事件处理。

Logstash 默认没有安装该插件,需要开发者自行安装。键入:

bin/logstash-plugin install logstash-codec-multiline

修改配置文件:

input {
# 从文件读取日志信息
file {
path => "/var/log/error.log"
type => "error"
start_position => "beginning"
# 使用 multiline 插件
codec => multiline {
# 通过正则表达式匹配,具体配置根据自身实际情况而定
pattern => "^\d"
negate => true
what => "previous"
}
} } # filter {
#
# } output {
# 输出到 elasticsearch
elasticsearch {
hosts => ["192.168.2.41:9200"]
index => "error-%{+YYYY.MM.dd}"
}
}

保存文件。键入:

bin/logstash -f logstash.conf

4.4 过滤器插件

过滤器插件位于 Logstash 管道的中间位置,对事件执行过滤处理,配置在 filter {},且可以配置多个。

本次测试使用 grok 插件演示,grok 插件用于过滤杂乱的内容,将其结构化,增加可读性。

安装:

bin/logstash-plugin install logstash-filter-grok

修改配置文件:

input {
stdin {}
} filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER
:duration}" }
}
} output {
stdout {
codec => "rubydebug"
}
}

保存文件。键入:

bin/logstash -f logstash.conf

启动成功后,我们输入:

55.3.244.1 GET /index.html  0.043

控制台返回:

[root@localhost logstash-5.6.]# bin/logstash -f logstash.conf
Sending Logstash's logs to /root/logstash-5.6.3/logs which is now configured via log4j2.properties
[--30T08::,][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.3/modules/fb_apache/configuration"}
[--30T08::,][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.3/modules/netflow/configuration"}
[--30T08::,][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>, "pipeline.batch.size"=>, "pipeline.batch.delay"=>, "pipeline.max_inflight"=>}
The stdin plugin is now waiting for input:
[--30T08::,][INFO ][logstash.pipeline ] Pipeline main started
[--30T08::,][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>}
55.3.244.1 GET /index.html 0.043
{
"duration" => "0.043",
"request" => "/index.html",
"@timestamp" => --30T15::.912Z,
"method" => "GET",
"bytes" => "",
"@version" => "",
"host" => "localhost.localdomain",
"client" => "55.3.244.1",
"message" => "55.3.244.1 GET /index.html 15824 0.043"
}

输入的内容被匹配到相应的名字中。