Logstash利用ruby将有用的日志放到一个ES_INDEX将无用的日志放到另一个ES_INDEX

时间:2021-08-23 11:19:37
input{
    kafka {
        bootstrap_servers => "127.0.0.1:9092"
        client_id => "nginxlog"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["nginx_log"]
        codec => "json"
        type => "nginx_log"
    }
}
filter{
   mutate {
        gsub => ["message", "\\x22", '"']
        gsub => ["message", "\\x09", '']
    }
    json {
        source => "message"
        remove_field=>["message","beat","@version","@timestamp"]
    }
    if [type] == "nginx_log" {
        ruby {
                code => '
                    -- 获取白名单
                    file = File.open("/usr/local/logstash/config/white.txt", "r")
                    text = file.read
                    file.close
                    -- 判断日志中request_uri属性是否在白名单中 -- 也可直接将不在白名单的日志排除 event.cancel if !text.include?(event.get("request_uri"))
                    if !text.include?(event.get("request_uri")) then
                        -- 如果不存在就增加一个属性es_flag=0表示该日志没用
                        event.set("es_flag","0")
                    else
                        -- 如果不存在就增加一个属性es_flag=1表示该日志有用
                        event.set("es_flag","1")
                    end
                '
            }
        }
    }
}
output {
    if [type] == "nginx_log" {
        -- 判断es_flag=1放到nginx-log-yes索引中
        if [es_flag] =="1" {
            elasticsearch {
                    hosts => "127.0.0.1:9200"
                    index => "nginx-log-yes"
            }
        }
        -- 判断es_flag=0放到nginx-log-no索引中
        else {
            elasticsearch {
                    hosts => "127.0.0.1:9200"
                    index => "nginx-log-no"
            }
        }
    }
}

 

Lostash event API说明

除了基本的get和set外,还提供了丰富的接口。我们能用到的方法包括:
删除事件:cancel
取消删除事件:uncancel
是否删除:cancelled?
是否包含字段:include?
删除字段:remove
事件转字符串:to_s
事件转hash字典(不含metadata字段):to_hash
事件转hash字典(含metadata字段):to_hash_with_metadata
事件转json字符串:to_json
增加tag:tag
取事件时间戳:timestamp

测试配置文件

input{
    stdin{
        codec=>json
    }
}filter{
    ruby{
        code=>'
            event.cancel
            event.set("cancelled",event.cancelled?)
            event.uncancel
            event.set("include",event.include?("hello"))
            event.remove("hello")
            event.set("to_s",event.to_s)
            event.set("to_hash",event.to_hash)
            event.set("to_hash_with_metadata",event.to_hash_with_metadata)
            event.set("to_json",event.to_json)
            event.tag("_test_tag")
            event.set("timestamp",event.timestamp)
        '
    }
}output{
    stdout{
        codec=>rubydebug
    }
}

启动logstash,然后输入如下,查看结果

{"hello":"world"}