翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为以下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink
【翻译】Flume 1.8.0 User Guide(用户指南) Channel
【翻译】Flume 1.8.0 User Guide(用户指南) Processors
Flume Channels
Channel 是事件在agent 上上演的存储库。Source添加事件,Sink删除事件。
1 Memory Channel
事件存储在内存队列中,具有可配置的最大大小。对于需要更高吞吐量并准备在agent失败时丢失阶段数据的流来说,它是理想的。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be memory |
capacity | 100 | The maximum number of events stored in the channel |
transactionCapacity | 100 | The maximum number of events the channel will take from a source or give to a sink per transaction |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
byteCapacityBufferPercentage | 20 |
Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity | see description |
Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity =
a1.channels.c1.transactionCapacity =
a1.channels.c1.byteCapacityBufferPercentage =
a1.channels.c1.byteCapacity =
2. JDBC Channel
事件存储在由数据库支持的持久存储中。JDBC通道目前支持嵌入式Derby。这是一个持久的通道,对于可恢复性非常重要的流来说非常理想。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be jdbc |
db.type | DERBY | Database vendor, needs to be DERBY. |
driver.class | org.apache.derby.jdbc.EmbeddedDriver | Class for vendor’s JDBC driver |
driver.url | (constructed from other properties) | JDBC connection URL |
db.username | “sa” | User id for db connection |
db.password | – | password for db connection |
connection.properties.file | – | JDBC Connection property file path |
create.schema | true | If true, then creates db schema if not there |
create.index | true | Create indexes to speed up lookups |
create.foreignkey | true | |
transaction.isolation | “READ_COMMITTED” | Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ |
maximum.connections | 10 | Max connections allowed to db |
maximum.capacity | 0 (unlimited) | Max number of events in the channel |
sysprop.* | DB Vendor specific properties | |
sysprop.user.home | Home path to store embedded Derby database |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
3. Kafka Channel
事件存储在Kafka集群中(必须单独安装)。Kafka提供高可用性和复制,因此在代理或Kafka代理崩溃时,事件可以立即提供给其他接收器
Kafka通道可用于多种场景:
1. With Flume source and sink-它提供了一个可靠的和高可用的渠道的事件
2. With Flume source and interceptor but no sink——它允许将Flume事件写入Kafka主题,供其他应用程序使用
3. With Flume sink, but no source——这是一种低延迟、容错的方式,可以将事件从Kafka发送到Flume接收器,如HDFS、HBase或Solr
这个版本的Flume需要Kafka 0.9或更高版本,因为它依赖于该版本附带的Kafka客户机。与以前的flume版本相比,通道的配置发生了变化。
配置参数组织如下:
1. 与通道相关的配置值通常应用于通道配置级别,例如:a1.channel.k1.type=
2. 与Kafka或通道如何操作相关的配置值以“Kafka”作为前缀。例如:a1.channel .k1.kafka。主题和a1.channels.k1.kafka.bootstrap.servers。这与hdfs接收器的操作方式没有什么不同
3. 特定于生产者/消费者的属性以kafka.producer或kafka.consumer作为前缀
4. 在可能的情况下,使用Kafka参数名,例如:bootstrap.servers 和 acks
这个版本的flume向后兼容以前的版本,但是下表中指出了一些不赞成使用的属性,当它们出现在配置文件中时,会在启动时记录一条警告消息。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel |
kafka.bootstrap.servers | – |
List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
kafka.topic | flume-channel | Kafka topic which the channel will use |
kafka.consumer.group.id | flume |
Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss. |
parseAsFlumeEvent | true |
Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact |
migrateZookeeperOffsets | true |
When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled. |
pollTimeout | 500 |
The amount of time(in milliseconds) to wait in the “poll()” call of the consumer.https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) |
defaultPartitionId | – |
Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class). |
partitionIdHeader | – |
When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId. |
kafka.consumer.auto.offset.reset | latest |
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer. |
kafka.producer.security.protocol | PLAINTEXT |
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
kafka.consumer.security.protocol | PLAINTEXT | Same as kafka.producer.security.protocol but for reading/consuming from Kafka. |
more producer/consumer security props |
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer. |
弃用的熟悉:
Property Name | Default | Description |
---|---|---|
brokerList | – |
List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port |
topic | flume-channel | Use kafka.topic |
groupId | flume | Use kafka.consumer.group.id |
readSmallestOffset | false | Use kafka.consumer.auto.offset.reset |
注意,由于通道的负载平衡方式,代理首次启动时可能会出现重复事件
Example for agent named a1:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-:,kafka-:,kafka-:
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
Security and Kafka Channel
Flume和Kafka之间的通信通道支持安全认证和数据加密。对于安全身份验证,可以使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(尽管参数名为SSL,但实际的协议是TLS实现)。
到目前为止,数据加密仅由SSL/TLS提供。
设置kafka.producer | consumer.security。符合下列任何一项价值的协议意味着:
- SASL_PLAINTEXT - Kerberos或无数据加密的纯文本身份验证
- SASL_SSL - Kerberos或具有数据加密的纯文本身份验证
- SSL - 使用可选身份验证的基于TLS的加密。
警告:启用SSL时会导致性能下降,其程度取决于CPU类型和JVM实现。参考文献:Kafka安全概述和用于跟踪这个问题的jira: Kafka -2561
TLS and Kafka Channel:
请阅读配置Kafka客户机SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任何一种:安全提供程序、密码套件、启用的协议、信任存储或密钥存储类型。
使用服务器端身份验证和数据加密的示例配置。
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-:,kafka-:,kafka-:
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
注意:默认情况下属性是ssl.end . identify。没有定义算法,因此没有执行主机名验证。为了启用主机名验证,请设置以下属性
a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
一旦启用,客户端将针对以下两个字段之一验证服务器的完全限定域名(FQDN):
- Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
- Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
如果还需要客户端身份验证,那么应该向Flume代理配置添加以下内容。每个Flume代理必须拥有自己的客户端证书,这些证书必须由Kafka代理单独或通过其签名链进行信任。常见的示例是通过一个根CA对每个客户端证书进行签名,而这个根CA又受到Kafka代理的信任。
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>
如果密钥存储和密钥使用不同的密码保护,则使用ssl.key.password 属性将为使用者和生产者密钥存储库提供所需的额外机密:
a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>
Kerberos and Kafka Channel:
要将Kafka通道与Kerberos保护的Kafka集群一起使用,请设置producer/consumer.security.protocol上面提到的生产者和/或消费者的属性。与Kafka代理一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了需要时的Zookeeper连接。有关JAAS文件内容的信息,请参见Kafka文档。可以通过flume-env.sh中的JAVA_OPTS指定这个JAAS文件的位置,也可以选择指定系统范围内的kerberos配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
Example secure configuration using SASL_PLAINTEXT:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-:,kafka-:,kafka-:
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
Example secure configuration using SASL_SSL:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-:,kafka-:,kafka-:
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
JAAS文件示例。有关其内容的参考,请参阅SASL配置的Kafka文档中所需身份验证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka源也可以连接到Zookeeper进行偏移迁移,因此“Client”部分也被添加到这个示例中。除非您需要偏移迁移,或者对于其他安全组件需要此部分,否则不需要这样做。另外,请确保Flume进程的操作系统用户具有jaas和keytab文件上的读权限。
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
}; KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
4. File Channel
必须属性以粗体显示。
Property Name Default | Description | |
---|---|---|
type | – | The component type name, needs to be file. |
checkpointDir | ~/.flume/file-channel/checkpoint | The directory where checkpoint file will be stored |
useDualCheckpoints | false | Backup the checkpoint. If this is set to true, backupCheckpointDir must be set |
backupCheckpointDir | – |
The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory |
dataDirs | ~/.flume/file-channel/data |
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance |
transactionCapacity | 10000 | The maximum size of transaction supported by the channel |
checkpointInterval | 30000 | Amount of time (in millis) between checkpoints |
maxFileSize | 2146435071 | Max size (in bytes) of a single log file |
minimumRequiredSpace | 524288000 |
Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value |
capacity | 1000000 | Maximum capacity of the channel |
keep-alive | 3 | Amount of time (in sec) to wait for a put operation |
use-log-replay-v1 | false | Expert: Use old replay logic |
use-fast-replay | false | Expert: Replay without using queue |
checkpointOnClose | true |
Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay. |
encryption.activeKey | – | Key name used to encrypt new data |
encryption.cipherProvider | – | Cipher provider type, supported types: AESCTRNOPADDING |
encryption.keyProvider | – | Key provider type, supported types: JCEKSFILE |
encryption.keyProvider.keyStoreFile | – | Path to the keystore file |
encrpytion.keyProvider.keyStorePasswordFile | – | Path to the keystore password file |
encryption.keyProvider.keys | – | List of all keys (e.g. history of the activeKey setting) |
encyption.keyProvider.keys.*.passwordFile | – | Path to the optional key password file |
注意,默认情况下,文件通道使用上面指定的检查点路径和用户home中的数据目录。因此,如果代理中有多个文件通道实例处于活动状态,那么只有一个实例能够锁定目录并导致其他通道初始化失败。因此,有必要为所有配置的通道提供显式路径,最好是在不同的磁盘上。此外,由于文件通道将在每次提交后同步到磁盘,因此需要将其与一个将事件批处理在一起的接收器/源耦合,以便在检查点和数据目录无法使用多个磁盘的情况下提供良好的性能。
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
下面是一些示例配置:
生成与密钥存储密码分离的密钥:
keytool -genseckey -alias key- -keypass keyPassword -keyalg AES \
-keysize -validity -keystore test.keystore \
-storetype jceks -storepass keyStorePassword
生成与密钥存储密码相同的密钥:
keytool -genseckey -alias key- -keyalg AES -keysize -validity \
-keystore src/test/resources/test.keystore -storetype jceks \
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-
假设您的key-0已经过期,新文件应该使用key-1加密:
a1.channels.c1.encryption.activeKey = key-
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key- key-
与上述场景相同,但key-0有自己的密码:
a1.channels.c1.encryption.activeKey = key-
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key- key-
a1.channels.c1.encryption.keyProvider.keys.key-.passwordFile = /path/to/key-.password
5. Spillable Memory Channel
事件存储在内存队列和磁盘中。内存中的队列作为主要存储,磁盘作为溢出。磁盘存储使用嵌入式文件通道进行管理。当内存中的队列满时,其他传入事件存储在文件通道中。对于在正常操作期间需要内存通道的高吞吐量的流,此通道是理想的,但同时需要文件通道的更大容量,以便更好地容忍间歇性sink端中断或漏速下降。在这种异常情况下,吞吐量将降低到文件通道速度。在代理崩溃或重新启动的情况下,只有当代理联机时才恢复存储在磁盘上的事件。该通道目前处于试验阶段,不建议用于生产。
必须属性以粗体显示。请参阅文件通道获取所需的其他属性。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be SPILLABLEMEMORY |
memoryCapacity | 10000 |
Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero. |
overflowCapacity | 100000000 |
Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero. |
overflowTimeout | 3 | The number of seconds to wait before enabling disk overflow when memory fills up. |
byteCapacityBufferPercentage | 20 |
Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity | see description |
Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentageconfiguration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
avgEventSize | 500 | Estimated average size of events, in bytes, going into the channel |
<file channel properties> | see file channel |
Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity. |
如果达到memoryCapacity或byteCapacity限制,则认为内存中的队列已满。
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity =
a1.channels.c1.overflowCapacity =
a1.channels.c1.byteCapacity =
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
要禁用内存中的队列和功能,如文件通道:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity =
a1.channels.c1.overflowCapacity =
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
要禁用溢出磁盘的使用和功能纯粹作为内存通道:
a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity =
a1.channels.c1.overflowCapacity =
6. Pseudo Transaction Channel
警告:伪事务通道仅用于单元测试,不用于生产环境。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity | 50 | The max number of events stored in the channel |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
7. Custom Channel
自定义通道是您自己的通道接口实现。启动Flume代理时,自定义通道的类及其依赖项必须包含在代理的类路径中。自定义通道的类型是它的FQCN。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be a FQCN |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel
Flume Channel Selectors
如果没有指定类型,则默认为“replication”。
Replicating Channel Selector (default)
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
selector.type | replicating | The component type name, needs to be replicating |
selector.optional | – | Set of channels to be marked as optional |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
在上面的配置中,c3是一个可选通道。写入c3失败将被简单地忽略。由于c1和c2不是可选的,因此写入这些通道失败将导致事务失败。
Multiplexing Channel Selector
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
selector.type | replicating | The component type name, needs to be multiplexing |
selector.header | flume.selector.header | |
selector.default | – | |
selector.mapping.* | – |
Example for agent named a1 and it’s source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Custom Channel Selector
自定义通道选择器是您自己的ChannelSelector接口实现。启动Flume代理时,必须将自定义通道选择器的类及其依赖项包含在代理的类路径中。自定义通道选择器的类型是其FQCN。
Property Name | Default | Description |
---|---|---|
selector.type | – | The component type name, needs to be your FQCN |
Example for agent named a1 and its source called r1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector
翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为以下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink