Logstash,从多个文档中的xml文件中拆分事件,保留根标记的信息

时间:2022-01-16 08:45:21

My problem: I have XML files that contain events I want to parse using Logstash to request it using Kibana after. I want to keep all the information from the ROOT tag in each event.

我的问题:我有XML文件,其中包含我想要使用Logstash解析的事件,之后使用Kibana请求它。我希望在每个事件中保留ROOT标记中的所有信息。

Input looks like :

输入看起来像:

<?xml version="1.0" encoding="UTF-8"?>
<ROOT number="34">
  <EVENTLIST>
    <EVENT name="hey"/>
    <EVENT name="you"/>
  </EVENTLIST>
</ROOT>

What I want, two documents like that:

我想要的是两个这样的文件:

{
  "number":"34"
  "name": "Hey"
}
{
  "number":"34"
  "name": "you"
}

Logstash conf:

input {
  stdin { }
}
filter {
  xml {
    store_xml => "false"
    source => "message"
    target => "EVENT"
    xpath => [
      "/ROOT/@number","number",
      "/ROOT/EVENTLIST/EVENT/@name","name"
    ]
  }
}
output { elasticsearch { host => localhost } stdout { codec => rubydebug } }

Didnt work. What I get :

没工作。我得到了什么:

{
  "number" : ["34"]
  "name":["hey,"you""]
}

I followed the solution of this post : https://serverfault.com/questions/615196/logstash-parsing-xml-document-containing-multiple-log-entries

我按照这篇文章的解决方案:https://serverfault.com/questions/615196/logstash-parsing-xml-document-containing-multiple-log-entries

But my problem remains, I lose information from root tag.

但我的问题仍然存在,我从root标签中丢失了信息。

One of solution could be to use some ruby filter to handle that, but I don't know ruby. Another is to use some java programing to convert XML into JSON before sending it to elasticsearch...

解决方案之一可能是使用一些红宝石过滤器来处理,但我不知道红宝石。另一种方法是使用一些java编程将XML转换为JSON,然后再将其发送到elasticsearch ...

Any ideas to handle that or do I have to learn ruby?

任何想法来处理或我必须学习红宝石?

2 个解决方案

#1


1  

If your structure is as simple as you show, you can use a memorize plugin that I wrote.

如果您的结构与您显示的一样简单,您可以使用我编写的memorize插件。

Your configuration would look something like this:

您的配置看起来像这样:

filter {
  if ([message] =~ /<ROOT/) {
    grok {
      match => [ "message", 
        'number="(?<number>\d+)" number2="(?<number1>\d+)"'
      ] 
    }
  } else if ([message] =~ /<EVENT /) {
    grok { 
      match => [ "message", 'name="(?<name>[^"]+)"']
    }
  }
  memorize {
    fields => ["number","number1"]
  }
  if ([message] !~ /<EVENT /) {
    drop {}
  } else {
    mutate { remove_field => ["message"] }
  }
}

My example shows looking for multiple things in the ROOT element based on your comments below. And here's the version of the plugin that supports memorizing multiple fields:

我的示例显示了根据您的评论在ROOT元素中查找多个内容。这是支持记忆多个字段的插件版本:

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "set"
#
# This filter will look for fields from an event and record the last value
# of them.  If any are not present, their last value will be added to the
# event
#
# The config looks like this:
#
#     filter {
#       memorize {
#         fields => ["time"]
#         default => { "time" => "00:00:00.000" }
#       }
#     }
#
# The `fields` is an array of the field NAMES that you want to memorize
# The `default` is a map of field names to field values that you want
# to use if the field isn't present and has no memorized value (optional)

class LogStash::Filters::Memorize < LogStash::Filters::Base

  config_name "memorize"
  milestone 2

  # An array of the field names to to memorize
  config :fields, :validate => :array, :required => true
  # a map for default values to use if its not seen before we need it
  config :default, :validate => :hash, :required => false

  # The stream identity is how the filter determines which stream an
  # event belongs to. See the multiline plugin if you want more details on how
  # this might work
  config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"

  public
  def initialize(config = {})
    super

    @threadsafe = false

    # This filter needs to keep state.
    @memorized = Hash.new
  end # def initialize

  public
  def register
    # nothing needed
  end # def register

  public
  def filter(event)
    return unless filter?(event)

    any = false
    @fields.each do |field|
      if event[field].nil?
    map = @memorized[@stream_identity]
        val = map.nil? ? nil : map[field]
        if val.nil?
          val = @default.nil? ? nil : @default[field]
        end
    if !val.nil?
          event[field] = val
          any = true
    end
      else
        map = @memorized[@stream_identity]
    if map.nil?
          map = @memorized[@stream_identity] = Hash.new
    end
    val = event[field]
    map[field] = event[field]
      end #if
      if any
        filter_matched(event)
      end
    end #field.each
  end
end

For logstash 1.5 and later, this plugin is available for installation via

对于logstash 1.5及更高版本,此插件可通过安装进行安装

bin/plugin install logstash-filter-memorize

#2


2  

Try this filter:

试试这个过滤器:

filter {
  xml {
    source => "message"
    target => "xml_content"
  }
  split {
    field => "xml_content[EVENTLIST]"
  }
  split {
    field => "xml_content[EVENTLIST][EVENT]"
  }
  mutate {
    add_field => { "number" => "%{xml_content[number]}" }
    add_field => { "name" => "%{xml_content[EVENTLIST][EVENT][name]}" }
    remove_field => ['xml_content', 'message', 'path']
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

It returns this events:

它返回此事件:

{
        "number" => "34",
    "@timestamp" => 2016-12-23T12:01:17.888Z,
      "@version" => "1",
          "host" => "xubuntu",
          "name" => "hey"
    ]
}
{
        "number" => "34",
    "@timestamp" => 2016-12-23T12:01:17.888Z,
      "@version" => "1",
          "host" => "xubuntu",
          "name" => "you"
    ]
}

#1


1  

If your structure is as simple as you show, you can use a memorize plugin that I wrote.

如果您的结构与您显示的一样简单,您可以使用我编写的memorize插件。

Your configuration would look something like this:

您的配置看起来像这样:

filter {
  if ([message] =~ /<ROOT/) {
    grok {
      match => [ "message", 
        'number="(?<number>\d+)" number2="(?<number1>\d+)"'
      ] 
    }
  } else if ([message] =~ /<EVENT /) {
    grok { 
      match => [ "message", 'name="(?<name>[^"]+)"']
    }
  }
  memorize {
    fields => ["number","number1"]
  }
  if ([message] !~ /<EVENT /) {
    drop {}
  } else {
    mutate { remove_field => ["message"] }
  }
}

My example shows looking for multiple things in the ROOT element based on your comments below. And here's the version of the plugin that supports memorizing multiple fields:

我的示例显示了根据您的评论在ROOT元素中查找多个内容。这是支持记忆多个字段的插件版本:

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "set"
#
# This filter will look for fields from an event and record the last value
# of them.  If any are not present, their last value will be added to the
# event
#
# The config looks like this:
#
#     filter {
#       memorize {
#         fields => ["time"]
#         default => { "time" => "00:00:00.000" }
#       }
#     }
#
# The `fields` is an array of the field NAMES that you want to memorize
# The `default` is a map of field names to field values that you want
# to use if the field isn't present and has no memorized value (optional)

class LogStash::Filters::Memorize < LogStash::Filters::Base

  config_name "memorize"
  milestone 2

  # An array of the field names to to memorize
  config :fields, :validate => :array, :required => true
  # a map for default values to use if its not seen before we need it
  config :default, :validate => :hash, :required => false

  # The stream identity is how the filter determines which stream an
  # event belongs to. See the multiline plugin if you want more details on how
  # this might work
  config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"

  public
  def initialize(config = {})
    super

    @threadsafe = false

    # This filter needs to keep state.
    @memorized = Hash.new
  end # def initialize

  public
  def register
    # nothing needed
  end # def register

  public
  def filter(event)
    return unless filter?(event)

    any = false
    @fields.each do |field|
      if event[field].nil?
    map = @memorized[@stream_identity]
        val = map.nil? ? nil : map[field]
        if val.nil?
          val = @default.nil? ? nil : @default[field]
        end
    if !val.nil?
          event[field] = val
          any = true
    end
      else
        map = @memorized[@stream_identity]
    if map.nil?
          map = @memorized[@stream_identity] = Hash.new
    end
    val = event[field]
    map[field] = event[field]
      end #if
      if any
        filter_matched(event)
      end
    end #field.each
  end
end

For logstash 1.5 and later, this plugin is available for installation via

对于logstash 1.5及更高版本,此插件可通过安装进行安装

bin/plugin install logstash-filter-memorize

#2


2  

Try this filter:

试试这个过滤器:

filter {
  xml {
    source => "message"
    target => "xml_content"
  }
  split {
    field => "xml_content[EVENTLIST]"
  }
  split {
    field => "xml_content[EVENTLIST][EVENT]"
  }
  mutate {
    add_field => { "number" => "%{xml_content[number]}" }
    add_field => { "name" => "%{xml_content[EVENTLIST][EVENT][name]}" }
    remove_field => ['xml_content', 'message', 'path']
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

It returns this events:

它返回此事件:

{
        "number" => "34",
    "@timestamp" => 2016-12-23T12:01:17.888Z,
      "@version" => "1",
          "host" => "xubuntu",
          "name" => "hey"
    ]
}
{
        "number" => "34",
    "@timestamp" => 2016-12-23T12:01:17.888Z,
      "@version" => "1",
          "host" => "xubuntu",
          "name" => "you"
    ]
}