flume内置的ChannelSelector有两种,分别是Replicating和Multiplexing。
Replicating类型的ChannelSelector会针对每一个Event,拷贝到所有的Channel中,这是默认的ChannelSelector。
replicating类型的ChannelSelector例子如下
a1.sources = r1
a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件 a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = a1.channels.c2.type = memory
a1.channels.c2.capacity =
a1.channels.c2.transactionCapacity =
Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
multiplexing类型的ChannelSelector例子如下:
a1.sources = r1 a1.sources.source1.selector.type = multiplexing
a1.sources.source1.selector.header = validation # 以header中的validation对应的值作为条件
a1.sources.source1.selector.mapping.SUCCESS = c2 # 如果header中validation的值为SUCCESS,使用c2这个channel
a1.sources.source1.selector.mapping.FAIL = c1 # 如果header中validation的值为FAIL,使用c1这个channel
a1.sources.source1.selector.default = c1 # 默认使用c1这个channel
a1.sources.source1.selector.header = validation # 以header中的validation对应的值作为条件 同理,如下conf文件:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2 # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/lib/flume-ng/test.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (\\w+):(\\w+):(\\w+)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = ip
a1.sources.r1.interceptors.i1.serializers.s2.name = domain
a1.sources.r1.interceptors.i1.serializers.s3.name = course a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = course
a1.sources.r1.selector.mapping.hadoop = c1
a1.sources.r1.selector.default = c2 # Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /tmp/multiplexing/flume_sink1 a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c2
a1.sinks.k2.sink.directory = /tmp/multiplexing/flume_sink2 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity = a1.channels.c2.type = memory
a1.channels.c2.capacity =
a1.channels.c2.transactionCapacity = # Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
interceptor只对头部进行改变。
source r1中的头部有IP、Domain和cource三种信息,而r1的selector.header = course,表示selector只对IP,Domain和Cource中的Cource进行判断选择,然后再划分channel。