在上一节我们导入数据使用的是java代码,其实这个步骤由logstash工具也可以实现哦~
此学习笔记参考官方文档:https://www.elastic.co/guide/en/logstash/current/introduction.html
下载安装
Logstash唯一的依赖的Java运行环境,确保jdk版本足够高,我的本地是1.8
下载和解压出来就能使用了,不需要安装
因为我的elasticsearch是5.3.0我就下了5.3.0版本,不确定有没有什么联系,我是这样下的
不要将Logstash安装到包含冒号(:)字符的目录路径中。
这个直接搜官网即可https://www.elastic.co/downloads/logstash
第一个小例子
简单介绍
我们之前有写过个小例子昂,从数据库里面将数据查出来,然后插入elasticsearch昂,现在有了logstash我们大概可以这样:
对于logstash来说,数据库就是一个输入流,它可以(可选)将输入流的数据通过fileter弄成某种格式然后传送给输出流(elasticsearch)啦
这次我们大概就用这个方法来将上次的小例子给完善一下
首先我们理解下基本知识
我们可以在我们解压后的目录运行我们的logstash相关的命令 D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin>logstash -e 'input { stdin {
} } output { stdout {} }'
(-e表示直接用标准输入流的内容移动到标准输出流啦)
看见他启动完,我们输入一个hello,就会自动跳出来一个输出流哦
自动会将时间戳和IP地址信息添加到信息中哦
退出命令是ctrl+C啦
使用Logstash解析日志
第一个小例子中,我们成功测试了我们的logstash,下面我们来向下一步迈进吧
配置Filebeat将日志行发送到Logstash
首先我么需要配置Filebeat将日志发送给Logstash。默认的Logstash安装包括Beats input插件。要在您的数据源计算机上安装Filebeat(https://www.elastic.co/downloads/beats/filebeat)
我下的是5.3.0 https://www.elastic.co/downloads/past-releases/filebeat-5-3-0
下完了之后参考https://www.elastic.co/guide/en/beats/filebeat/6.1/filebeat-installation.html
我们需要:
1.将zip文件的内容解压缩到C:\Program Files
2.将filebeat--windows目录重命名为Filebeat
3.以管理员身份打开PowerShell提示(右键单击PowerShell图标并选择以管理员身份运行)。如果您正在运行Windows XP,则可能需要下载并安装PowerShell。(顾名思义win7可以跳过3,4两步,直接打开命令行即可)
在PowerShell提示符下,运行以下命令将Filebeat安装为Windows服务
PS> cd’C:\ Program Files \ Filebeat’
PS C:\ Program Files \ Filebeat>。\ install-service-filebeat.ps1
4.如果在系统上禁用脚本执行,则需要为当前会话设置执行策略以允许脚本运行。例如:PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1。
安装Filebeat后,您需要配置它。打开filebeat.yml位于Filebeat安装目录中的文件,并用以下行替换内容。确保paths指向logstash-tutorial.log以前下载的示例Apache日志文件 :
所以我们把刚才安装在c:\Program Files下的Filebeat下的filebeat.yml文件可以配置成以下啦:
filebeat.prospectors:
- input_type: log
paths:
- D:\weblogic_directory\user_projects\domains2\base_domain1\log\aoplog*
output.logstash:
hosts: ["localhost:5044"]
paths下是你要发送的log日志文件哟-我这里使用了本地服务器里面的log,什么都可以啦
在filebeat.exe的目录中执行C:\Program Files\Filebeat>filebeat -e -c filebeat.yml -d "publish"
Filebeat将尝试在端口5044上连接。直到Logstash从一个活动的Beats插件开始,在该端口上将不会有任何答案,因此您在该端口上无法连接的任何消息现在都是正常的。
现在命令后是这样的,因为他的输出流是logstash,还没有开启,很正常,我们接下来就回去配置logstash啦
2018/01/17 03:27:03.832313 output.go:109: DBG output worker: publish 2045 event
s
2018/01/17 03:27:05.875430 single.go:140: ERR Connecting error publishing events
(retrying): dial tcp [::1]:5044: connectex: No connection could be made because
the target machine actively refused it.
因为无法连接嘛,发不到logstash上(不要退出这个命令哦)所以我们下一步就是让他连接上哟,现在我们就可以
配置logstash进行文件输入
接下来,创建一个Logstash配置管道,使用Beats输入插件从Beats接收事件。
Logstash的管道配置框架,我觉得类似于HTML通用骨架,是这样滴
input {
}
# filter {
#
# }
output {
}
注意到了吗,filter是可选的哟
我们现在可以配置了
在logstash解压后的bin目录下,新建first-pipeline.conf并将刚才的框架复制进去
【PS:管道配置文件,例first-pipeline.conf,logstash会默认加载.conf结尾的作为管道配置文件哟】
input {
beats {
port =>“5044”
}
}
# filter {
#
# }
output {
stdout {codec => ruby??debug}
}
Input配置意为
使用Beats输入插件
Output配置意为
将logstash运行时输出打印在stdout(控制台输出流)
验证配置是否正确:
logstash -f first-pipeline.conf --config.test_and_exit
–config.test_and_exit选项解析您的配置文件并报告任何错误。
如果配置文件通过配置测试,请使用以下命令启动Logstash:
logstash -f first-pipeline.conf --config.reload.automatic
如果你的管道工作正常,你应该看到一系列事件写入控制台:
{
"@timestamp" => 2018-01-17T05:13:05.793Z,
"offset" => 2363257,
"@version" => "1",
"input_type" => "log",
"beat" => {
"hostname" => "CC-PC",
"name" => "CC-PC",
"version" => "5.3.0"
},
"host" => "CC-PC",
"source" => "D:\\weblogic_directory\\user_projects\\domains2\\base_domai n1\\log\\aoplog.log.5",
"message" => "[fd9a597f-4368-4122-b3ab-939aefbc2107] [43592299-0e8d-4a73- 8666-4e363e8bd541] [] [com.sinosoft.fixation.intf.service.spring.FixationByRiskP remiumServiceSpringImpl.findFixationByRiskPremium] [????????????] [start]",
"type" => "log",
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}
当然,logstash也有自己的input插件可以将File当做输入流,可以自己试一下哟。
现在你已经可以用filebeat将日志文件发送到Logstash啦,而且logstash将日志文件 (input)打印在了控制台上(output),但是会发现日志消息的格式并不理想,如果希望特定的命名字段,可以使用grok过滤器插件
Grok Filter Plugin解析Web日志
该grok过滤器插件是几个插件,默认情况下在Logstash可用之一。
该grok过滤器插件,使您能够将非结构化日志数据分析到结构化的东西和可查询。
该行开头的IP地址很容易识别,括号中的时间戳也是如此。要解析数据,可以使用%{COMBINEDAPACHELOG}Grok模式,该模式使用以下模式从Apache日志中构建行:
Information | Field Name |
---|---|
IP Address | clientip |
User ID | ident |
User Authentication | auth |
timestamp | timestamp |
HTTP Verb | verb |
Request body | request |
HTTP Version | httpversion |
HTTP Status Code | response |
Bytes served | bytes |
Referrer URL | referrer |
User agent | agent |
我们只需要在first-pipeline.conf文件中将filter配置为
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
例如这样配置:(match意味匹配规则:message的匹配格式)
grok {
match => { "message"=>"%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{HOSTNAME:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) (%{QS:referrer}) %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{USERNAME:upstream_response_time}) > (%{USERNAME:response_time})"
}
#匹配模式 message是每段读进来的日志,IP、HTTPDATE、WORD、NOTSPACE、NUMBER都是patterns/grok-patterns中定义好的正则格式名称,对照上面的日志进行编写,冒号,(?:%{USER:ident}|-)这种形式是条件判断,相当于程序里面的二目运算。如果有双引号""或者[]号,需要在前面加\进行转义。
}
由于您启用了自动配置重新加载,因此您不必重新启动Logstash即可获取更改。但是,您需要强制Filebeat从头开始读取日志文件。为此,进入Filebeat正在运行的终端窗口,然后按Ctrl + C关闭Filebeat。然后删除Filebeat注册表文件。例如,运行:
rm data/registry
接下来,使用以下命令重新启动Filebeat:
filebeat -e -c filebeat.yml -d "publish"
看看你的打印的日志信息是不是不一样了呢(message中)
将数据编入Elasticsearch
当当当,到了最后我们的终极目标的地方了哟
现在网络日志被分解成特定的字段,Logstash管道可以将数据索引到Elasticsearch集群中。编辑该first-pipeline.conf文件并用output以下文本替换整个部分:
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
然后我们打开elasticsearch服务,再重新执行filebeat -e -c filebeat.yml –d “publish”
最后去elasticsearch中去看看!
成功导入咯!
当然logstash的输入流端有很多,比如File,redis,filebeat等等,输出流也有很多elasticsearch,file等其他,具体请参见官方文档
下面我们就要来学习怎么把昨天的数据库中的数据用logstash工具导入到elasticsearch中啦
从数据库中导入到elasticsearch
首先我们需要logstash的另一个插件
logstash-input-jdbc插件
ps:怎么知道自己有没有logstash的插件呢
在(D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin)bin目录中有一个logstash-plugin的命令
在此目录下我们执行logstash-plugin list
就可以看到logstash目前默认安装的所有插件啦
我们看到logstash5.3.0默认安装好了哦
不过如果需要自己安装的话,那自行搜索即可,也可直接参考https://github.com/logstash-plugins
【以下参考https://www.cnblogs.com/zhaijunming5/p/6478940.html
插件获取地址:
https://github.com/logstash-plugins
在线安装:
/plugin install logstash-input-jdbc
升级插件:
/plugin update logstash-input-jdbc
卸载插件:
/plugin uninstall logstash-input-jdbc
】
logstash配置文件
然后我们就可以直接将数据库当做输入流了,顺便我们吧输出流一起配置了
informix-pipeline.conf建立如下
input {
jdbc {
#驱动:这个插件不包含JDBC驱动程序库。所需的jdbc驱动程序库必须使用jdbc_driver_library配置选项显式传递给插件 。
jdbc_driver_library => "D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin\informix\ifxjdbc.jar"
#连接驱动程序类,必需参数
jdbc_driver_class => "com.informix.jdbc.IfxDriver"
#连接url,必需参数
jdbc_connection_string => "jdbc:informix-sqli://xxxxx:xxx/xxxxx:informixserver=xxx;NEWCODESET=gb18030,8859-1,819,Big5;IFX_USE_STRENC=true;"
#连接用户名,必需参数
jdbc_user => "xxx"
#连接密码,必需参数
jdbc_password => "xxx"
#连接密码文件名,可以将密码放入文件中
#jdbc_password_filepath => ""
#jdbc获取的大小,如果不提供,将使用驱动程序的默认值,数字型
#jdbc_fetch_size => 100
#连接池配置。使用前验证连接。默认false
#jdbc_validate_connection => false
#连接池配置。验证连接的频率(以秒为单位),默认3600
#jdbc_validation_timeout => 3600
#来自这个插件的输入可以按照特定的时间表来定期运行。这个调度语法由rufus-scheduler提供支持。
#可参考https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings获得具体格式如何定义
#不设定即为只执行一次
#schedule => "* * * * *"
#需要一个sql语句,可以有statement以字符串形式传递,也可以使用statement_filepath将sql放入sql文件中的形式传递
#statement => "select * from PrpFriskPremrateTable where comcode = :favorite_artist"
#假若statement内有参数传递
#parameters => { "favorite_artist" => "32000000" }
statement_filepath => "D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin\informix\search.sql"
#记录SQL查询的日志级别,接受的值是常见的值,致命错误,警告,信息和调试。默认值是info。
#sql_log_level => "info"
}
}
#filter {
#}
output {
elasticsearch {
hosts => [ "localhost:9200" ] #elasticsearch的地址,或者cluster=>"ClusterName"
#想要执行的操作,默认是"index"表示创建一个新的索引
#action => "index"
#protocol =>"http" #恩,hots设置了这就最好别设置,会报错
#要写入事件的索引,默认值是 "logstash-%{+YYYY.MM.dd}"
index=>"cqp2-riskpremrate-%{+YYYY.MM.dd}" #在elasticsearch里创建索引,索引名称设置,type就是document_type的值,test_output-nginx-2017-02-28
#索引的文档ID。用于使用相同的ID覆盖Elasticsearch中的现有条目。
#document_id => "cqp2" # elasticsearch中的_id
#document_type=>"riskpremrate" # elasticsearch中的_type
#其他设置参考官方文档https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
}
}
注意将驱动和sql文件放在文件路径中能让logstash访问哦
开启elasticsearch服务后,开启logstash发布informix管道
D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin>logstash -f informix-pipeline
.conf --config.reload.automatic
如果启动有报错信息针对改一下配置文件即可。
成功启动后可以看到elasticsearch中,数据导入了哟~
到这里结束啦,我只是看了最基本的操作,如需更多功能,请看官方文档哟~
谢谢~