Elasticsearch(十五)elasticsearch用代码从数据库里查询导入到elasticsearch中并查询结果 -- logstash

时间:2021-09-06 08:20:29

在上一节我们导入数据使用的是java代码,其实这个步骤由logstash工具也可以实现哦~

此学习笔记参考官方文档:https://www.elastic.co/guide/en/logstash/current/introduction.html

下载安装

Logstash唯一的依赖的Java运行环境,确保jdk版本足够高,我的本地是1.8
下载和解压出来就能使用了,不需要安装
因为我的elasticsearch是5.3.0我就下了5.3.0版本,不确定有没有什么联系,我是这样下的
不要将Logstash安装到包含冒号(:)字符的目录路径中。
这个直接搜官网即可https://www.elastic.co/downloads/logstash
第一个小例子

简单介绍

我们之前有写过个小例子昂,从数据库里面将数据查出来,然后插入elasticsearch昂,现在有了logstash我们大概可以这样:
Elasticsearch(十五)elasticsearch用代码从数据库里查询导入到elasticsearch中并查询结果 -- logstash

对于logstash来说,数据库就是一个输入流,它可以(可选)将输入流的数据通过fileter弄成某种格式然后传送给输出流(elasticsearch)啦

这次我们大概就用这个方法来将上次的小例子给完善一下

首先我们理解下基本知识
我们可以在我们解压后的目录运行我们的logstash相关的命令
D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin>logstash -e 'input { stdin {
} } output { stdout {} }'

(-e表示直接用标准输入流的内容移动到标准输出流啦)
看见他启动完,我们输入一个hello,就会自动跳出来一个输出流哦

Elasticsearch(十五)elasticsearch用代码从数据库里查询导入到elasticsearch中并查询结果 -- logstash

自动会将时间戳和IP地址信息添加到信息中哦

退出命令是ctrl+C啦

使用Logstash解析日志

第一个小例子中,我们成功测试了我们的logstash,下面我们来向下一步迈进吧

配置Filebeat将日志行发送到Logstash

首先我么需要配置Filebeat将日志发送给Logstash。默认的Logstash安装包括Beats input插件。要在您的数据源计算机上安装Filebeat(https://www.elastic.co/downloads/beats/filebeat
我下的是5.3.0 https://www.elastic.co/downloads/past-releases/filebeat-5-3-0
下完了之后参考https://www.elastic.co/guide/en/beats/filebeat/6.1/filebeat-installation.html
我们需要:
1.将zip文件的内容解压缩到C:\Program Files
2.将filebeat--windows目录重命名为Filebeat
3.以管理员身份打开PowerShell提示(右键单击PowerShell图标并选择以管理员身份运行)。如果您正在运行Windows XP,则可能需要下载并安装PowerShell。(顾名思义win7可以跳过3,4两步,直接打开命令行即可)
在PowerShell提示符下,运行以下命令将Filebeat安装为Windows服务
PS> cd’C:\ Program Files \ Filebeat’
PS C:\ Program Files \ Filebeat>。\ install-service-filebeat.ps1

4.如果在系统上禁用脚本执行,则需要为当前会话设置执行策略以允许脚本运行。例如:PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1。

安装Filebeat后,您需要配置它。打开filebeat.yml位于Filebeat安装目录中的文件,并用以下行替换内容。确保paths指向logstash-tutorial.log以前下载的示例Apache日志文件 :

所以我们把刚才安装在c:\Program Files下的Filebeat下的filebeat.yml文件可以配置成以下啦:

filebeat.prospectors:
- input_type: log
paths:
- D:\weblogic_directory\user_projects\domains2\base_domain1\log\aoplog*
output.logstash:
hosts: ["localhost:5044"]

paths下是你要发送的log日志文件哟-我这里使用了本地服务器里面的log,什么都可以啦

在filebeat.exe的目录中执行C:\Program Files\Filebeat>filebeat -e -c filebeat.yml -d "publish"

Filebeat将尝试在端口5044上连接。直到Logstash从一个活动的Beats插件开始,在该端口上将不会有任何答案,因此您在该端口上无法连接的任何消息现在都是正常的。
现在命令后是这样的,因为他的输出流是logstash,还没有开启,很正常,我们接下来就回去配置logstash啦

2018/01/17 03:27:03.832313 output.go:109: DBG  output worker: publish 2045 event
s
2018/01/17 03:27:05.875430 single.go:140: ERR Connecting error publishing events
 (retrying): dial tcp [::1]:5044: connectex: No connection could be made because
 the target machine actively refused it.

因为无法连接嘛,发不到logstash上(不要退出这个命令哦)所以我们下一步就是让他连接上哟,现在我们就可以

配置logstash进行文件输入

接下来,创建一个Logstash配置管道,使用Beats输入插件从Beats接收事件。

Logstash的管道配置框架,我觉得类似于HTML通用骨架,是这样滴

input {
}
# filter {
#
# }
output {
}

注意到了吗,filter是可选的哟

我们现在可以配置了
在logstash解压后的bin目录下,新建first-pipeline.conf并将刚才的框架复制进去
【PS:管道配置文件,例first-pipeline.conf,logstash会默认加载.conf结尾的作为管道配置文件哟】

input {
    beats {
        port =>“5044”
    }
}

# filter {
#
# }
output {
    stdout {codec => ruby??debug}
}

Input配置意为
使用Beats输入插件
Output配置意为
将logstash运行时输出打印在stdout(控制台输出流)

验证配置是否正确:

logstash -f first-pipeline.conf --config.test_and_exit

–config.test_and_exit选项解析您的配置文件并报告任何错误。
如果配置文件通过配置测试,请使用以下命令启动Logstash:

logstash -f first-pipeline.conf --config.reload.automatic

如果你的管道工作正常,你应该看到一系列事件写入控制台:

{
    "@timestamp" => 2018-01-17T05:13:05.793Z,
        "offset" => 2363257,
      "@version" => "1",
    "input_type" => "log",
          "beat" => {
        "hostname" => "CC-PC",
            "name" => "CC-PC",
         "version" => "5.3.0"
    },
          "host" => "CC-PC",
        "source" => "D:\\weblogic_directory\\user_projects\\domains2\\base_domai n1\\log\\aoplog.log.5",
       "message" => "[fd9a597f-4368-4122-b3ab-939aefbc2107] [43592299-0e8d-4a73- 8666-4e363e8bd541] [] [com.sinosoft.fixation.intf.service.spring.FixationByRiskP remiumServiceSpringImpl.findFixationByRiskPremium] [????????????] [start]",
          "type" => "log",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ]
}

当然,logstash也有自己的input插件可以将File当做输入流,可以自己试一下哟。
现在你已经可以用filebeat将日志文件发送到Logstash啦,而且logstash将日志文件 (input)打印在了控制台上(output),但是会发现日志消息的格式并不理想,如果希望特定的命名字段,可以使用grok过滤器插件

Grok Filter Plugin解析Web日志

该grok过滤器插件是几个插件,默认情况下在Logstash可用之一。
该grok过滤器插件,使您能够将非结构化日志数据分析到结构化的东西和可查询。

该行开头的IP地址很容易识别,括号中的时间戳也是如此。要解析数据,可以使用%{COMBINEDAPACHELOG}Grok模式,该模式使用以下模式从Apache日志中构建行:

Information Field Name
IP Address clientip
User ID ident
User Authentication auth
timestamp timestamp
HTTP Verb verb
Request body request
HTTP Version httpversion
HTTP Status Code response
Bytes served bytes
Referrer URL referrer
User agent agent

我们只需要在first-pipeline.conf文件中将filter配置为

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}

例如这样配置:(match意味匹配规则:message的匹配格式)

grok {
        match => { "message"=>"%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{HOSTNAME:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) (%{QS:referrer}) %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{USERNAME:upstream_response_time}) > (%{USERNAME:response_time})"
        }
        #匹配模式 message是每段读进来的日志,IP、HTTPDATE、WORD、NOTSPACE、NUMBER都是patterns/grok-patterns中定义好的正则格式名称,对照上面的日志进行编写,冒号,(?:%{USER:ident}|-)这种形式是条件判断,相当于程序里面的二目运算。如果有双引号""或者[]号,需要在前面加\进行转义。
    }

由于您启用了自动配置重新加载,因此您不必重新启动Logstash即可获取更改。但是,您需要强制Filebeat从头开始读取日志文件。为此,进入Filebeat正在运行的终端窗口,然后按Ctrl + C关闭Filebeat。然后删除Filebeat注册表文件。例如,运行:

rm data/registry

接下来,使用以下命令重新启动Filebeat:

filebeat -e -c filebeat.yml -d "publish"

看看你的打印的日志信息是不是不一样了呢(message中)

将数据编入Elasticsearch

当当当,到了最后我们的终极目标的地方了哟

现在网络日志被分解成特定的字段,Logstash管道可以将数据索引到Elasticsearch集群中。编辑该first-pipeline.conf文件并用output以下文本替换整个部分:

output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

然后我们打开elasticsearch服务,再重新执行filebeat -e -c filebeat.yml –d “publish”

最后去elasticsearch中去看看!
Elasticsearch(十五)elasticsearch用代码从数据库里查询导入到elasticsearch中并查询结果 -- logstash

成功导入咯!

当然logstash的输入流端有很多,比如File,redis,filebeat等等,输出流也有很多elasticsearch,file等其他,具体请参见官方文档

下面我们就要来学习怎么把昨天的数据库中的数据用logstash工具导入到elasticsearch中啦

从数据库中导入到elasticsearch

首先我们需要logstash的另一个插件

logstash-input-jdbc插件

ps:怎么知道自己有没有logstash的插件呢
在(D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin)bin目录中有一个logstash-plugin的命令

在此目录下我们执行logstash-plugin list
就可以看到logstash目前默认安装的所有插件啦

我们看到logstash5.3.0默认安装好了哦
Elasticsearch(十五)elasticsearch用代码从数据库里查询导入到elasticsearch中并查询结果 -- logstash

不过如果需要自己安装的话,那自行搜索即可,也可直接参考https://github.com/logstash-plugins

【以下参考https://www.cnblogs.com/zhaijunming5/p/6478940.html
插件获取地址:
https://github.com/logstash-plugins

在线安装:
/plugin install logstash-input-jdbc

升级插件:
/plugin update logstash-input-jdbc

卸载插件:
/plugin uninstall logstash-input-jdbc

logstash配置文件

然后我们就可以直接将数据库当做输入流了,顺便我们吧输出流一起配置了
informix-pipeline.conf建立如下

input {
  jdbc {
    #驱动:这个插件不包含JDBC驱动程序库。所需的jdbc驱动程序库必须使用jdbc_driver_library配置选项显式传递给插件 。
    jdbc_driver_library => "D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin\informix\ifxjdbc.jar"
    #连接驱动程序类,必需参数
    jdbc_driver_class => "com.informix.jdbc.IfxDriver"
    #连接url,必需参数
    jdbc_connection_string => "jdbc:informix-sqli://xxxxx:xxx/xxxxx:informixserver=xxx;NEWCODESET=gb18030,8859-1,819,Big5;IFX_USE_STRENC=true;"
    #连接用户名,必需参数
    jdbc_user => "xxx"
    #连接密码,必需参数
    jdbc_password => "xxx"
    #连接密码文件名,可以将密码放入文件中
    #jdbc_password_filepath => ""

    #jdbc获取的大小,如果不提供,将使用驱动程序的默认值,数字型
    #jdbc_fetch_size => 100

    #连接池配置。使用前验证连接。默认false
    #jdbc_validate_connection => false
    #连接池配置。验证连接的频率(以秒为单位),默认3600
    #jdbc_validation_timeout => 3600

    #来自这个插件的输入可以按照特定的时间表来定期运行。这个调度语法由rufus-scheduler提供支持。
    #可参考https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings获得具体格式如何定义
    #不设定即为只执行一次
    #schedule => "* * * * *"

    #需要一个sql语句,可以有statement以字符串形式传递,也可以使用statement_filepath将sql放入sql文件中的形式传递
    #statement => "select * from PrpFriskPremrateTable where comcode = :favorite_artist" 
    #假若statement内有参数传递
    #parameters => { "favorite_artist" => "32000000" }
    statement_filepath => "D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin\informix\search.sql"
    #记录SQL查询的日志级别,接受的值是常见的值,致命错误,警告,信息和调试。默认值是info。
    #sql_log_level => "info"
  }
}
#filter {
#}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ] #elasticsearch的地址,或者cluster=>"ClusterName" 
        #想要执行的操作,默认是"index"表示创建一个新的索引
        #action => "index"
        #protocol =>"http" #恩,hots设置了这就最好别设置,会报错
        #要写入事件的索引,默认值是 "logstash-%{+YYYY.MM.dd}"
        index=>"cqp2-riskpremrate-%{+YYYY.MM.dd}"    #在elasticsearch里创建索引,索引名称设置,type就是document_type的值,test_output-nginx-2017-02-28
        #索引的文档ID。用于使用相同的ID覆盖Elasticsearch中的现有条目。
        #document_id => "cqp2" # elasticsearch中的_id
        #document_type=>"riskpremrate" # elasticsearch中的_type

        #其他设置参考官方文档https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
    }
}

注意将驱动和sql文件放在文件路径中能让logstash访问哦

开启elasticsearch服务后,开启logstash发布informix管道

D:\elasticsearch\logstash-5.3.0\logstash-5.3.0\bin>logstash -f informix-pipeline
.conf --config.reload.automatic

如果启动有报错信息针对改一下配置文件即可。

成功启动后可以看到elasticsearch中,数据导入了哟~

Elasticsearch(十五)elasticsearch用代码从数据库里查询导入到elasticsearch中并查询结果 -- logstash

到这里结束啦,我只是看了最基本的操作,如需更多功能,请看官方文档哟~
谢谢~