flume-拦截器、channel选择器、sink组合sink处理器

时间:2023-01-22 03:30:22

1. Flume Interceptors

Flume有能力修改/删除流程中的events。这是在拦截器(interceptor)的帮助下完成的。拦截器(Interceptors)是实现org.apache.flume.interceptor.Interceptor接口的类。一个interceptor可以根据interceptor的开发者选择的任何标准来修改,甚至放弃events。这个可以通过在配置中指定一系列interceptor生成类名来实现。Interceptors在source配置中被指定作为空白分隔符列表。如果interceptor需要放弃events,它不会在它需要返回的列表中返回该events。如果interceptor放弃全部events,然后它返回一个空列表。简单示例:

flume-拦截器、channel选择器、sink组合sink处理器

注意:该interceptor构建是被传递给type配置属性。interceptors本身是可配置的,并且可以像传递给其他可配置组件一样传递配置值。在上述示例中,events先传递到HostInterceptor,并且events被HostInterceptor返回,然后独自传递到TimestampInterceptor。你可以指定完全限定的类名称或者别名 timestamp。如果你有多个收集器写到同一个HDFS路径,然后你也可以使用HostInterceptor。

1.1 Timestamp Interceptor

该interceptor向event headers插入秒级时间,当event被处理时。该interceptor插入一个带有关键timestamp(或者由header属性指定)的header,其值是相关的timestamp。该interceptors可以保留一个已存在timestamp,如果它已经在配置中预先配置。

flume-拦截器、channel选择器、sink组合sink处理器

agent a1示例:

flume-拦截器、channel选择器、sink组合sink处理器

1.2 Host Interceptor

该interceptor插入运行agent的host的hostname或者IP地址。它根据配置插入带有密钥host或配置密钥(其值为host的hostname或IP地址)的header。

flume-拦截器、channel选择器、sink组合sink处理器

agent a1的示例:

flume-拦截器、channel选择器、sink组合sink处理器

1.3 Static Interceptor

静态interceptor运行用户给所有events添加一个带有静态值的静态header。

flume-拦截器、channel选择器、sink组合sink处理器

agent a1的示例:

flume-拦截器、channel选择器、sink组合sink处理器

1.4 Remove Header Interceptor

该interceptor通过移除一个或多个headers来操作Flume event headers。它可以移除一个静态定义的header,基于规则表达式的headers或者在一个列表中的headers。如果这些没有定义,或者如果没有header匹配到标准,Flume events将不会修改。

注意:如果只有一个header需要移除,通过名字指定它可以提供比其他两种方法更好的性能。

flume-拦截器、channel选择器、sink组合sink处理器

1.5 UUID Interceptor

该interceptor在被拦截的所有事件上设置一个通用唯一的标识符。

flume-拦截器、channel选择器、sink组合sink处理器

1.6 Morphline Interceptor

该interceptor通过morphline配置文件过滤events,该配置文件定义了一条从一个命令到另一个命令管道记录的转换命令链。例如,morphline可以忽略某些events,或者通过基于正则表达式的模式匹配来改变或者插入某些event headers,或者它可以通过Apache Tika自动检测和设置一个MIME类型在被拦截的events上。

flume-拦截器、channel选择器、sink组合sink处理器

简单示例flume .conf文件:

flume-拦截器、channel选择器、sink组合sink处理器

1.7 Search and Replace Interceptor

该interceptor提供了基于Java正则表达式的简单的基于字符串的search-and-replace功能。回溯/组捕获也是可用的。这个interceptor使用与Java Matcher.replaceAll()方法相同的规则。

flume-拦截器、channel选择器、sink组合sink处理器

配置示例:

flume-拦截器、channel选择器、sink组合sink处理器

另一个示例:

flume-拦截器、channel选择器、sink组合sink处理器

1.8 Regex Flitering Interceptor

该拦截器通过将event正文解释为文本并将文本与配置的正则表达式进行匹配来选择性地过滤events。

flume-拦截器、channel选择器、sink组合sink处理器

1.9 Regex Extractor Interceptor

此interceptor使用指定的正则表达式提取正则表达式匹配组,并将匹配组附加为event的headers。

flume-拦截器、channel选择器、sink组合sink处理器

该serializers用于将匹配映射到header名称和格式化的header值;默认的,你只需要指定header名称和默认org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer将会被使用。这个serializer只是将匹配映射到指定的header名称,并传递通过由正则表达式提取的值。

Example 1:

如果Flume event正文包含1:2:3:4foobar5,可以使用下面配置:

flume-拦截器、channel选择器、sink组合sink处理器

提取的event将会包含相同正文,但是以下headers将会附加one=>1,two=>2,three=>3.

Example 2:

如果Flume event正文包含2012-10-18 18:47:57,614 some log line ,可以使用下面的配置:

flume-拦截器、channel选择器、sink组合sink处理器

提取的event将会包含相同的正文,但是以下的headers将会附加timestamp=>1350611220000.

参考资料:

https://flume.apache.org/FlumeUserGuide.html