logstash配置白名单决定去哪个index

时间:2022-08-06 14:34:35
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
client_id => "log"
auto_offset_reset => "latest"
consumer_threads =>
decorate_events => true
topics => ["nginx_access_log_test"]
codec => "json"
type => "nginx_log"
}
} filter {
mutate {
gsub => ["message", "\\x22", '"']
gsub => ["message", "\\x09", '']
}
json {
source => "message"
remove_field=>["message","beat","@version"]
} ruby {
code => '
file = File.open("/usr/local/logstash/config/white.txt", "r")
text = file.read
file.close if !text.include?(event.get("request_uri")) then
event.set("es_flag","")
else
event.set("es_flag","")
end
'
}
} output {
if [es_flag] ==""
{
elasticsearch
{
hosts => "127.0.0.1:9200"
index => "nginx_access_log_test"
} }
else
{
elasticsearch
{
hosts => "127.0.0.1:9200"
index => "nginx_access_log_test2"
} }
stdout {
codec => rubydebug
}
}

复杂点的生产环境配置:

input{
kafka {
bootstrap_servers => "127.0.0.1:9092"
client_id => "nginxaccesslog"
auto_offset_reset => "latest"
consumer_threads =>
decorate_events => true
topics => ["nginx_access_log"]
codec => "json"
type => "nginx_log"
}
kafka {
bootstrap_servers => "127.0.0.1:9092"
client_id => "database"
auto_offset_reset => "latest"
consumer_threads =>
decorate_events => true
topics => ["dsideal_db"]
codec => "json"
type => "dsideal_db"
}
kafka {
bootstrap_servers => "127.0.0.1:9092"
client_id => "devops_real"
auto_offset_reset => "latest"
consumer_threads =>
decorate_events => true
topics => ["devopsrealinfo"]
codec => "json"
type => "devopsrealinfo"
}
kafka {
bootstrap_servers => "127.0.0.1:9092"
client_id => "devops_base"
auto_offset_reset => "latest"
consumer_threads =>
decorate_events => true
topics => ["devopsbaseinfo"]
codec => "json"
type => "devopsbaseinfo"
}
}
filter{
mutate {
gsub => ["message", "\\x22", '"']
gsub => ["message", "\\x09", '']
}
json {
source => "message"
remove_field=>["message","beat","@version","@timestamp"]
}
if [type] == "nginx_log" {
ruby {
code => '
file = File.open("/usr/local/logstash/config/white.txt", "r")
text = file.read
file.close if !text.include?(event.get("request_uri")) then
event.set("es_flag","")
else
event.set("es_flag","")
end '
}
} } output {
if [type] == "nginx_log" {
if [es_flag] =="" {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "nginx-access-log"
}
}
else {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "nginx-access-log-other"
}
}
}
if [type] == "dsideal_db" {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "%{table_name}"
document_id => "%{id}"
}
}
if [type] == "devopsbaseinfo" {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "devopsbaseinfo"
document_id => "%{id}"
}
}
if [type] == "devopsrealinfo" {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "devopsrealinfo"
}
}
}