翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为以下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink
【翻译】Flume 1.8.0 User Guide(用户指南) Channel
【翻译】Flume 1.8.0 User Guide(用户指南) Processors
flume 的sources
1 Avro source
监听Avro端口并接收来自外部Avro客户端流的事件。当与另一个(上一跳)Flume agent上的内置Avro接收器配对时,它可以创建分层的集合拓扑。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be avro |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
compression-type | none | This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
ssl | false | Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
ipFilter | false | Set this to true to enable ipFiltering for netty |
ipFilterRules | – | Define N netty ipFilter pattern rules with this config. |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
ipFilterRules的例子
ipFilterRules定义了N个由逗号分隔的netty ipFilters,模式规则必须采用这种格式。
< '允许'或'拒绝>:< ' ip '或'名称'为计算机名>:<模式>或允许/拒绝:ip/名称:模式
例如:ipFilterRules =允许:ip: 127。*,允许:名称:localhost,否认:ip: *
注意,要匹配的第一个规则将应用于本地主机上的客户机,如下面的示例所示
这将允许本地主机上的客户端拒绝来自其他ip的客户端" Allow:name:localhost,deny:ip: ",这将拒绝本地主机上的客户端允许来自任何其他ip的客户端" deny:name:localhost, Allow:ip: "
2 Thrift Source
监听Thrift端口并接收来自外部Thrift客户机流的事件。当与另一个(上一跳)Flume agent上的内置ThriftSink一起使用时,它可以创建分层的集合拓扑。通过启用kerberos身份验证,可以将Thrift source配置为以安全模式启动。agent-principal和agent-keytab是Thrift source用于对kerberos KDC进行身份验证的属性。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be thrift |
bind | – | hostname or IP address to listen on |
port | – | Port # to bind to |
threads | – | Maximum number of worker threads to spawn |
selector.type | ||
selector.* | ||
interceptors | – | Space separated list of interceptors |
interceptors.* | ||
ssl | false | Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”. |
keystore | – | This is the path to a Java keystore file. Required for SSL. |
keystore-password | – | The password for the Java keystore. Required for SSL. |
keystore-type | JKS | The type of the Java keystore. This can be “JKS” or “PKCS12”. |
exclude-protocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
kerberos | false |
Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC. |
agent-principal | – | The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC. |
agent-keytab | —- | The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC. |
agent a1 的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
3 Exec Source
Exec source在启动时运行给定的Unix命令,并期望该进程在标准输出上持续生成数据(stderr将被丢弃,除非属性logStdErr设置为true)。如果进程因任何原因退出,源也将退出,并且不会生成更多数据。这意味着cat [named pipe]或tail - f [file]等配置将产生所需的结果,而as date可能不会——前两个命令生成数据流,而后者生成单个事件并退出。必须属性以粗体显示
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
batchTimeout | 3000 | Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
警告ExecSource和其他异步源的问题是,如果无法将事件放入客户端知道的Channel中,源不能保证这一点。在这种情况下,数据将丢失。
例如,最常见的请求特性之一是类似于tail -F [file]的用例,其中应用程序将写入磁盘上的日志文件,并跟踪文件,将每一行作为事件
发送。虽然这是可能的,但有一个明显的问题;如果Channel已满,Flume无法发送事件,会发生什么情况?Flume无法向编写日志文件的应
用程序指示它需要保留日志或出于某种原因没有发送事件。如果这没有意义,您只需要知道:在使用单向异步接口(如ExecSource)时,应用
程序永远不能保证接收到数据!作为此警告的扩展—并且要完全清楚—在使用此源时绝对没有事件交付的保证。要获得更强的可靠性保证,可以考
虑使用假脱机目录源、Taildir源或通过SDK直接与Flume集成。
agent a1的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
“shell”配置用于通过命令shell(如Bash或Powershell)调用“command”。“command”作为参数传递给“shell”以供执行。这允许“command”使用shell的特性,如通配符、反勾号、管道、循环、条件等。在没有“shell”配置的情况下,将直接调用“命令”。“shell”的常用值:“/bin/sh -c”、“/bin/ksh -c”、“cmd /c”、“powershell -Command”等。
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
4 JMS Source
JMS源从JMS目的地(如队列或主题)读取消息。作为JMS应用程序,它应该可以与任何JMS提供程序一起工作,但只通过ActiveMQ进行了测试。JMS源提供可配置的批大小、消息选择器、用户/密码和消息到flume事件转换器。请注意,供应商提供的JMS jar应该包含在Flume类路径中,plugins.d目录(首选),命令行上的-classpath,或者通过flume-env.sh中的FLUME_CLASSPATH变量。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be jms |
initialContextFactory | – | Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory should appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic) |
messageSelector | – | Message selector to use when creating the consumer |
userName | – | Username for the destination/provider |
passwordFile | – | File containing the password for the destination/provider |
batchSize | 100 | Number of messages to consume in one batch |
converter.type | DEFAULT | Class to use to convert messages to flume events. See below. |
converter.* | – | Converter properties. |
converter.charset | UTF-8 | Default converter only. Charset to use when converting JMS TextMessages to byte arrays. |
createDurableSubscription | false |
Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified. |
clientId | – | JMS client identifier set on Connection right after it is created. Required for durable subscriptions. |
durableSubscriptionName | – | Name used to identify the durable subscription. Required for durable subscriptions. |
4.1 转换器
JMS源允许可插入转换器,尽管默认转换器可能在大多数情况下都可以工作。默认转换器能够将字节、文本和对象消息转换为FlumeEvents。在所有情况下,消息中的属性都作为header添加到FlumeEvent中。
BytesMessage:
消息字节被复制到FlumeEvent的主体中。每条消息不能转换超过2GB的数据。
TextMessage:
将消息文本转换为字节数组并复制到FlumeEvent的主体中。默认转换器默认使用UTF-8,但这是可配置的。
ObjectMessage:
对象被写入到包装在ObjectOutputStream中的ByteArrayOutputStream中,并将结果数组复制到FlumeEvent的主体中。
agent a1的示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
5 sqooping 目录souce
这个源允许您通过将文件放入磁盘上的“假脱机”目录来获取数据。这个源将在指定的目录中查找新文件,并在新文件出现时解析新文件中的事件。事件解析逻辑是可插入的。将给定的文件完全读入通道后,将对其进行重命名,以指示完成(或可选地删除)。
与Exec源不同,此源是可靠的,即使重新启动或关闭Flume,也不会丢失数据。作为这种可靠性的交换,必须将不可变的、惟一命名的文件放入假脱机目录中。Flume试图检测这些问题情况,如果违反这些情况将会失败:
如果文件被放入假脱机目录后写入,Flume将向其日志文件打印错误并停止处理。
如果以后重用文件名,Flume将向其日志文件打印错误并停止处理。
为了避免上述问题,在将文件名移动到假脱机目录时,添加惟一标识符(如时间戳)来记录文件名可能是有用的。
尽管此源具有可靠性保证,但在某些情况下,如果发生某些下游故障,仍然可能重复发生事件。这与其他flume 组件提供的保证是一致的。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be spooldir. |
spoolDir | – | The directory from which to read files from. |
fileSuffix | .COMPLETED | Suffix to append to completely ingested files |
deletePolicy | never | When to delete completed files: never or immediate |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
basenameHeader | false | Whether to add a header storing the basename of the file. |
basenameHeaderKey | basename | Header Key to use when appending basename of file to event header. |
includePattern | ^.*$ |
Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
ignorePattern | ^$ |
Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
trackerDir | .flumespool | Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. |
consumeOrder | oldest |
In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory. |
pollDelay | 500 | Delay (in milliseconds) used when polling for new files. |
recursiveDirectorySearch | false | Whether to monitor sub directories for new files to read. |
maxBackoff | 4000 |
The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. |
batchSize | 100 | Granularity at which to batch transfer to the channel |
inputCharset | UTF-8 | Character set used by deserializers that treat the input file as text. |
decodeErrorPolicy | FAIL |
What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence. |
deserializer | LINE |
Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder. |
deserializer.* | Varies per event deserializer. | |
bufferMaxLines | – | (Obselete) This option is now ignored. |
bufferMaxLineLength | 5000 | (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent agent-1示例:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
5.1 事件反序列化器
下面的事件反序列化器随Flume一起发布。
5.1.1 行
这个反序列化器为每行文本输入生成一个事件。
Property Name | Default | Description |
---|---|---|
deserializer.maxLineLength | 2048 |
Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event. |
deserializer.outputCharset | UTF-8 | Charset to use for encoding events put into the channel. |
5.1.2 Avor
这个反序列化器能够读取Avro容器文件,并在文件中为每个Avro记录生成一个事件。每个事件都用一个表示所使用模式的头进行注释。事件的主体是二进制Avro记录数据,不包括模式或容器文件元素的其余部分。
注意,如果spool目录源必须重试将这些事件中的一个放到channel上(例如,因为channel已满),那么它将从最近的Avro容器文件同步点重置并重试。要减少这种失败场景中的潜在事件重复,请在Avro输入文件中更频繁地编写同步标记。
Property Name | Default | Description |
---|---|---|
deserializer.schemaType | HASH |
How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored in every event in the event header “flume.avro.schema.hash”. If LITERAL is specified, the JSON-encoded schema itself is stored in every event in the event header “flume.avro.schema.literal”. Using LITERAL mode is relatively inefficient compared to HASH mode. |
5.1.3 BlobDeserializer
这个反序列化器为每个事件读取一个二进制大对象(BLOB),通常为每个文件读取一个BLOB。例如PDF或JPG文件。注意,这种方法不适用于非常大的对象,因为整个BLOB都在RAM中缓冲。
Property Name | Default | Description |
---|---|---|
deserializer | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder |
deserializer.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
6 tail 目录源
注意,此源代码是作为预览功能提供的。它不能在Windows上运行。
监视指定的文件,并在检测到附加到每个文件的新行之后几乎实时跟踪它们。如果正在写入新行,该源代码将重试读取它们,等待写入完成。
这个源是可靠的,不会错过数据,即使当尾文件滚动。它定期以JSON格式将每个文件的最后一个读位置写到给定的位置文件上。如果flume由于某种原因停止或停机,它可以将指定位置读取的数据写入现有位置文件中(注:从指定的断开位置读positionFile参数,写入到上次的文件中)。
在其他用例中,这个源文件还可以开始跟踪使用给定位置文件的每个文件的任意位置。当指定路径上没有位置文件时,默认情况下它将从每个文件的第一行开始尾随。
文件将按照修改时间的顺序被使用。修改时间最长的文件将首先被使用。
此源不重命名、删除或对被跟踪的文件做任何修改。目前,该源代码不支持跟踪二进制文件。它逐行读取文本文件。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be TAILDIR. |
filegroups | – | Space-separated list of file groups. Each file group indicates a set of files to be tailed. |
filegroups.<filegroupName> | – | Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only. |
positionFile | ~/.flume/taildir_position.json | File in JSON format to record the inode, the absolute path and the last position of each tailing file. |
headers.<filegroupName>.<headerKey> | – | Header value which is the set with header key. Multiple headers can be specified for one file group. |
byteOffsetHeader | false | Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
skipToEnd | false | Whether to skip the position to EOF in the case of files not written on the position file. |
idleTimeout | 120000 | Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. |
writePosInterval | 3000 | Interval time (ms) to write the last position of each file on the position file. |
batchSize | 100 | Max number of lines to read and send to the channel at a time. Using the default is usually fine. |
backoffSleepIncrement | 1000 | The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
maxBackoffSleep | 5000 | The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
cachePatternMatching | true |
Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. |
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
7 Twitter 1% firehose Source(实验)
警告:这个源是高度实验性的,可能会在不同版本的Flume之间发生变化。使用风险自负。
实验源,通过流API连接到1%的示例twitter firehose,持续下载tweet,将其转换为Avro格式,并将Avro事件发送到下游Flume sink。需要使用者和访问令牌以及Twitter开发人员帐户的秘密。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.twitter.TwitterSource |
consumerKey | – | OAuth consumer key |
consumerSecret | – | OAuth consumer secret |
accessToken | – | OAuth access token |
accessTokenSecret | – | OAuth token secret |
maxBatchSize | 1000 | Maximum number of twitter messages to put in a single batch |
maxBatchDurationMillis | 1000 | Maximum number of milliseconds to wait before closing a batch |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200
8 Kafka Source
Kafka源是一个Apache Kafka消费者,它从Kafka主题读取消息。如果您有多个Kafka源在运行,您可以将它们配置为同一个消费组,这样每个消费组都将为主题读取一组惟一的分区。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | List of brokers in the Kafka cluster used by the source |
kafka.consumer.group.id | flume |
Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group |
kafka.topics | – | Comma-separated list of topics the kafka consumer will read messages from. |
kafka.topics.regex | – |
Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics and overrides kafka.topics if exists. |
batchSize | 1000 | Maximum number of messages written to Channel in one batch |
batchDurationMillis | 1000 |
Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. |
backoffSleepIncrement | 1000 |
Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. |
maxBackoffSleep | 5000 |
Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. |
useFlumeEventFormat | false |
By default events are taken as bytes from the Kafka topic directly into the event body. Set to true to read events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers sent on the producing side. |
setTopicHeader | true | When set to true, stores the topic of the retrieved message into a header, defined by the topicHeaderproperty. |
topicHeader | topic |
Defines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink topicHeader property so as to avoid sending the message back to the same topic in a loop. |
migrateZookeeperOffsets | true |
When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset defines how offsets are handled. Check Kafka documentation for details |
kafka.consumer.security.protocol | PLAINTEXT |
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
more consumer security props |
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer. |
|
Other Kafka Consumer Properties | – |
These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefixkafka.consumer. For example: kafka.consumer.auto.offset.reset |
注意:Kafka源覆盖了两个Kafka使用者参数:auto.commit.enable被源设置为“false”,并且每批都提交了。Kafka源保证至少一次消息检索策略。当源启动时,可以显示副本。Kafka源代码还为key.deserializer(org.apache. Kafka .common. serialize . stringserializer)和value.deserializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供了默认值。不建议修改这些参数。
弃用属性:
Property Name | Default | Description |
---|---|---|
topic | – | Use kafka.topics |
groupId | flume | Use kafka.consumer.group.id |
zookeeperConnect | – |
Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers to establish connection with kafka cluster |
以逗号分隔的topic 列表订阅topic的示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
regex订阅topic的示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
安全和kafka source:
Flume和Kafka之间的通信通道支持安全认证和数据加密。对于安全身份验证,可以使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(尽管参数名为SSL,但实际的协议是TLS实现)。
到目前为止,数据加密仅由SSL/TLS提供。
设置kafka.consumer.security.protocol 符合下列任何一项价值意味着:
SASL_PLAINTEXT - 没有数据加密的Kerberos或明文身份验证
SASL_SSL - 带有数据加密的Kerberos或纯文本身份验证
SSL - TLS加密,具有可选的身份验证。
警告:启用SSL时会导致性能下降,其程度取决于CPU类型和JVM实现。参考文献:Kafka安全概述和用于跟踪这个问题的jira: Kafka -2561
TLS and Kafka Source:
请阅读配置Kafka客户端SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任何一种:安全提供程序、密码套件、启用的协议、信任存储或密钥存储类型。
使用服务器端身份验证和数据加密的示例配置:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
注意:默认情况下属性是ssl.endpoint.identification.algorithm 没有定义,因此没有执行主机名验证。为了启用主机名验证,请设置以下属性:
a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS
一旦启用,客户端将针对以下两个字段之一验证服务器的完全限定域名(FQDN):
- Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
- Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
如果还需要客户端身份验证,那么应该向Flume agent配置添加以下内容。每个Flume代理必须拥有自己的客户端证书,这些证书必须由Kafka brokers单独或通过其签名链进行信任。常见的示例是通过一个根CA对每个客户端证书进行签名,而这个根CA证书又受到Kafka代理的信任。
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
如果密钥存储和密钥使用不同的密码保护,则使用ssl.key.password 属性将为两个使用者密钥存储库提供所需的额外机密:
a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>
Kerberos and Kafka Source:
要将Kafka源与Kerberos保护的Kafka集群一起使用,请设置consumer.security.protocol为上面使用者指出的属性。与Kafka brokers一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了需要时的Zookeeper连接。有关JAAS文件内容的信息,请参见Kafka文档。可以通过flume-env.sh中的JAVA_OPTS指定这个JAAS文件的位置,也可以选择指定系统范围内的kerberos配置:
JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"
使用SASL_PLAINTEXT的安全配置示例:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
使用SASL_SSL的安全配置示例:
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
JAAS文件示例。有关其内容的参考,请参阅SASL配置的Kafka文档中所需身份验证机制(GSSAPI/PLAIN)的客户端配置部分。由于Kafka源也可以连接到Zookeeper进行偏移迁移,因此“Client”部分也被添加到这个示例中。除非您需要偏移迁移,或者对于其他安全组件需要此部分,否则不需要这样做。另外,请确保Flume进程的操作系统用户具有jaas和keytab文件上的读权限。
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
}; KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
9 NetCat TCP Source
一个类似于netcat的source,它监听给定端口并将每一行文本转换为一个事件。类似于nc -k -l[ip][port]。换句话说,它打开指定的端口并侦听数据。预期提供的数据是换行分隔的文本。每一行文本都被转换为Flume事件,并通过连接的channel发送。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcat |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
ack-every-event | true | Respond with an “OK” for every event received |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
10 NetCat UDP Source
根据原始Netcat (TCP)源,该源在给定端口上侦听,并将每一行文本转换为一个事件,并通过连接的通道发送。类似于nc -u -k -l[host][端port]。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be netcatudp |
bind | – | Host name or IP address to bind to |
port | – | Port # to bind to |
remoteAddressHeader | – | |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
11 Sequence Generator Source
一个简单的序列生成器,它使用计数器连续生成事件,计数器从0开始,递增1,并在totalEvents停止。无法将事件发送到channel时重试。主要用于测试。在重试期间,它保持重试消息的主体与以前相同,这样,在目的地重复数据删除之后,唯一事件的数量预期将等于指定的totalEvents。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be seq |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
batchSize | 1 | Number of events to attempt to process per request loop. |
totalEvents | Long.MAX_VALUE | Number of unique events sent by the source. |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
12 Syslog Sources
读取syslog数据并生成Flume事件。UDP源将整个消息视为单个事件。TCP源为用换行符(' n ')分隔的每个字符串创建一个新事件。
必须属性以粗体显示。
12.1 Syslog TCP Source
原始的、可靠的syslog TCP源.
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogtcp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
eventSize | 2500 | Maximum size of a single event line, in bytes |
keepFields | none |
Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
例如,agent a1 syslog TCP source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port =
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
12.2 Multiport Syslog TCP Source
这是一个更新、更快、支持多端口的Syslog TCP源版本。请注意,端口配置设置已经替换了端口。多端口功能意味着它可以以一种有效的方式同时监听多个端口。这个源代码使用Apache Mina库来实现这一点。提供对RFC-3164和许多常见的RFC-5424格式消息的支持。还提供按端口配置所使用的字符集的功能。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be multiport_syslogtcp |
host | – | Host name or IP address to bind to. |
ports | – | Space-separated list (one or more) of ports to bind to. |
eventSize | 2500 | Maximum size of a single event line, in bytes. |
keepFields | none |
Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’. |
portHeader | – |
If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port. |
charset.default | UTF-8 | Default character set used while parsing syslog events into strings. |
charset.port.<port> | – | Character set is configurable on a per-port basis. |
batchSize | 100 | Maximum number of events to attempt to process per request loop. Using the default is usually fine. |
readBufferSize | 1024 | Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine. |
numProcessors | (auto-detected) |
Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable. |
selector.type | replicating | replicating, multiplexing, or custom |
selector.* | – | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors. |
interceptors.* |
For example, a multiport syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports =
a1.sources.r1.portHeader = port
12.3 Syslog UDP Source
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be syslogudp |
host | – | Host name or IP address to bind to |
port | – | Port # to bind to |
keepFields | false | Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event. |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
For example, a syslog UDP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port =
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
13 HTTP source
通过HTTP POST和GET接受Flume事件的源。GET应该只用于实验。HTTP请求通过必须实现HTTPSourceHandler接口的可插入“处理程序”转换为flume事件。这个处理程序接受HttpServletRequest并返回一个flume事件列表。从一个Http请求处理的所有事件都提交给一个事务中的通道,从而提高了文件通道等通道的效率。如果处理程序抛出异常,该源将返回400的HTTP状态。如果channel已满,或者源无法向channel追加事件,源将返回一个HTTP 503—暂时不可用状态。
在一个post请求中发送的所有事件都被视为一个批处理,并在一个事务中插入到channel中。
Property Name | Default | Description |
---|---|---|
type | The component type name, needs to be http | |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
handler.* | – | Config parameters for the handler |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* | ||
enableSSL | false | Set the property true, to enable SSL. HTTP Source does not support SSLv3. |
excludeProtocols | SSLv3 | Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded. |
keystore | Location of the keystore includng keystore file name | |
keystorePassword Keystore password |
例如, a http source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port =
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
13.1 JSONHandler
提供了一个开箱即用的处理程序,它可以处理JSON格式表示的事件,并支持UTF-8、UTF-16和UTF-32字符集。处理程序接受事件数组(即使只有一个事件,也必须以数组的形式发送事件),并根据请求中指定的编码将其转换为Flume事件。如果没有指定编码,则假定为UTF-8。JSON处理程序支持UTF-8、UTF-16和UTF-32。事件表示如下:
[{
"headers" : {
"timestamp" : "",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]
要设置字符集,请求必须具有指定为application/json的内容类型;charset=UTF-8(根据需要将UTF-8替换为UTF-16或UTF-32)。
按照此处理程序所期望的格式创建事件的一种方法是使用Flume SDK中提供的JSONEvent,并使用谷歌Gson使用Gson#fromJson(对象、类型)方法创建JSON字符串。传递给事件列表的方法的第二个参数的类型令牌可以通过以下方式创建:
Type type = new TypeToken<List<JSONEvent>>() {}.getType();
13.2 BlobHandler
默认情况下,HTTPSource将JSON输入拆分为Flume事件。另一种选择是,BlobHandler是HTTPSource的处理程序,它返回一个事件,该事件包含请求参数以及随此请求上载的二进制大对象(BLOB)。例如PDF或JPG文件。注意,这种方法不适用于非常大的对象,因为它在RAM中缓冲整个BLOB。
Property Name | Default | Description |
---|---|---|
handler | – | The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler |
handler.maxBlobLength | 100000000 | The maximum number of bytes to read and buffer for a given request |
14 Stress Source
StressSource是一个内部的负载产生源实现,它对压力测试非常有用。它允许用户使用空头配置事件有效负载的大小。用户可以配置要发送的事件总数以及要交付的成功事件的最大数量。
必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.StressSource |
size | 500 | Payload size of each Event. Unit:byte |
maxTotalEvents | -1 | Maximum number of Events to be sent |
maxSuccessfulEvents | -1 | Maximum number of Events successfully sent |
batchSize | 1 | Number of Events to be sent in one batch |
agent a1 示例:
a1.sources = stresssource-
a1.channels = memoryChannel-
a1.sources.stresssource-.type = org.apache.flume.source.StressSource
a1.sources.stresssource-.size =
a1.sources.stresssource-.maxTotalEvents =
a1.sources.stresssource-.channels = memoryChannel-
15 Legacy Sources
legacy sources允许使用Flume1.x agent 接收代理0.9.4 agent 发送的事件。它接受Flume 0.9.4格式的事件,将它们转换为Flume 1.0格式,并将它们存储在连接的channel中。时间戳、pri、主机、nanos等0.9.4事件属性被转换为1。x事件头属性。legacy sources同时支持Avro和Thrift RPC连接。要在两个Flume版本之间使用此桥接,您需要启动一个Flume 1.x 具有avroLegacy或thriftLegacy源的agent , 0.9.4 agent应该让agent sink指向1.x agent的主机/端口。
注意:Flume的可靠性语义1.x与水槽0.9.x不同。0.9.x Flume agent的E2E或DFO模式不支持 legacy source, 仅支持0.9.x 模式是最好,虽然可以设置为1.x,当事件被保存到flume1.x的channel中时,legacy source流将适用于它们。(注:翻译不清楚,无力)
必须属性以粗体显示。
15.1 Avro Legacy Source
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind =
a1.sources.r1.channels = c1
15.2 Thrift Legacy Source
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | The hostname or IP address to bind to |
port | – | The port # to listen on |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind =
a1.sources.r1.channels = c1
16 自定义source
自定义源是您自己对源接口的实现。启动Flume代理时,必须将自定义源的类及其依赖项包含在代理的类路径中。定制源的类型是它的FQCN。
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be your FQCN |
selector.type | replicating or multiplexing | |
selector.* | replicating | Depends on the selector.type value |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
agent a1示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1
17 Scribe Source
Scribe是另一种获取系统。Flume采用现有的Scribe 获取系统,应在Thrift的基础上使用ScribeSource,并采用兼容的传输协议。关于Scribe的部署,请遵循Facebook的指南。必须属性以粗体显示。
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be org.apache.flume.source.scribe.ScribeSource |
port | 1499 | Port that Scribe should be connected |
maxReadBufferBytes | 16384000 | Thrift Default FrameBuffer Size |
workerThreads | 5 | Handing threads number in Thrift |
selector.type | ||
selector.* |
agent a1 示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port =
a1.sources.r1.workerThreads =
a1.sources.r1.channels = c1
翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide
篇幅限制,分为以下5篇:
【翻译】Flume 1.8.0 User Guide(用户指南)
【翻译】Flume 1.8.0 User Guide(用户指南) source
【翻译】Flume 1.8.0 User Guide(用户指南) Sink