需求:input为json,output为ES,需使用filter提取json中的某个字段,并执行加法、加法、乘法等算法操作
思路:mutate过滤器+ruby过滤器实现
避坑:根据ES及Logstash版本 参考官网API
配置:
input
{"timestamp": 1538545228,"rect_id": 205,"serial_no":"OSDC5W9O","location_id":"214_332_123","device_item_id": 113,"device_id": 13,"tenant_id": 2324,"mac_address": "sd3lk2l4l2b4","slave_id": "004","iot_id":"8bf6f72267d951cb87ffd72f982959e1","metric": {"device_status": 0,"g_sensor": 0,"weight": 420,"humidity": 50,"env_temp": 50,"pig_temp": 45,"sow_action": 3,"boar_action": 3,"distance_1": 120,"distance_2": 100,"distance_vertical": 80,"NH3": 0.05,"CO2": 0.1,"illumination": 8.1}}
output
{
"timestamp" => 1538582828,
"rect_id" => 200,
"device_id" => 13,
"@version" => "1",
"tenant_id" => 2324,
"device_item_id" => 113,
"@timestamp" => 2018-10-05T00:45:39.857Z,
"iot_id" => "8bf6f72267d951cb87ffd72f982959e1",
"type" => "logstash-kafka",
"metric" => {
"device_status" => 0,
"g_sensor" => 0,
"humidity" => 50,
"distance_1" => 120,
"weight" => 420,
"pig_temp" => 45,
"sow_action" => 3,
"boar_action" => 3,
"distance_vertical" => 80,
"CO2" => 0.1,
"illumination" => 8.1,
"distance_2" => 100,
"env_temp" => 50,
"NH3" => 0.05
},
"serial_no" => "OSDC5W9O",
"mac_address" => "sd3lk2l4l2b4",
"location_id" => "214_332_123",
"timestamp_origin" => 1538554028,
"slave_id" => "004"
}
input {
kafka {
bootstrap_servers => ["xxxxxx:9092,xxxxxx:9092"] #替换为自己的Kafka集群地址
client_id => "logstash-172.19.100.180"
group_id => "logstash-dev"
auto_offset_reset => "latest" #smallest
consumer_threads => 5
decorate_events => true
topics => ["zds-iot-topic"]
type => "logstash-kafka"
codec => "json"
}
}
filter {
if [type] == "logstash-kafka" {
mutate {
copy => { "timestamp" => "timestamp_origin" }
}
# Add 8 hours
ruby {code => "event.set('timestamp', event.get('timestamp').to_i + 28800)"}
}
} output {
if [type] == "java_log" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "javaapp_log_index"
}
}
if [type] == "logstash-kafka" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "iot_data"
document_type => "sensor"
}
}
stdout { }
}