复制:实时监听 NetCat 端口数据到本地文件系统和 HDFS 案例分析
案例需求:假设有一个生产场景,Flume1 在持续监控一个 netcat 端口的数据流。Flume1 将端口传送的数据流传递给 Flume2,Flume2 负责存储到本地文件系统。同时 Flume1 将端口传送的数据流传递给 Flume3,Flume3 负责存储到 HDFS。
需求分析:该实训是,Flume1 监听本机 4141 端口的数据流,Flume1 将数据发送给 Flume2 和 Flume3,Flume2 将数据收集并存储到本地文件系统,而 Flume3 则将数据收集并保存到 HDFS。过程如下图所示:
一、Flume Source 配置讲解
- NetCat TCP Source 配置讲解
NetCat TCP Source 侦听给定端口并将每一行文本转换为一个事件(Event),也就是数据是基于换行符分隔。它的工作就像命令 nc -k -l [host] [port]。换句话说,它打开一个指定端口并监听数据。期望提供的数据是换行符分隔的文本。每一行文本都会变成一个 Flume 事件,并通过连接的通道(Channel)发送。以下是必需的参数:
属性名称 默认值 说明
channels – 与 Source 绑定的 Channel,多个用空格分开
type – 组件类型名称,需要是 netcat
bind – 日志需要发送到的主机名或者 IP 地址,该主机运行着 netcat 类型的 Source
port – 日志需要发送到的端口号,该端口要有 netcat 类型的 Source 在监听
selector.type replicating 配置 Channel 选择器。可选值:replicating 或 multiplexing,分别表示: 复制、多路复用
2. Avro Source 配置讲解
Avro Source 监听 Avro 端口并接收从外部 Avro 客户端发送来的数据流。如果与上一层 Agent 的 Avro Sink 配合使用就组成了一个分层的拓扑结构。利用 Avro Source 可以实现多级流动、扇出流、扇入流等效果。以下是必需的参数:
属性名称 默认值 描述
channels – 与 Source 绑定的 Channel,多个用空格分开
type – 组件类型名称,需要是 avro
bind – 要监听的主机名或 IP 地址
port – 要监听的服务端口
二、Flume Sink 配置讲解
- Avro Sink 配置讲解
Avro Sink 可以作为 Flume 分层收集特性的下半部分。发送到此 Sink 的 Flume Event 将转换为 Avro Event 发送到配置的主机/端口上。Event 从配置的 Channel 中批量获取,数量根据配置的 batch-size 而定。以下是必需的参数:
属性名称 默认值 描述
channel – 与 Sink 绑定的 Channel
type – 组件类型名称,需要是 avro
bind – 要监听的主机名或 IP 地址
port – 要监听的服务端口
2. File Roll Sink 配置讲解
将 Event 存储到本地文件系统。以下是必需的参数:
属性名称 默认值 描述
channel – 与 Sink 绑定的 Channel
type – 组件类型名称,需要是 file_roll
sink.directory – 存储文件的目录
3. HDFS Sink 配置讲解
HDFS Sink 将 Event 写入 Hadoop 分布式文件系统(也就是 HDFS)。它目前支持创建文本和序列文件,以及两种类型的压缩文件。
HDFS Sink 可以根据写入的时间、文件大小或 Event 数量定期滚动文件(关闭当前文件并创建新文件)。它还可以根据 Event 发生的时间戳或系统时间等属性对数据进行存储/分区。存储文件的 HDFS 目录路径可以使用格式转义序列,这些转义序列将被 HDFS Sink 进行动态地替换,以生成用于存储 Event 的目录或文件名。使用 HDFS Sink 时需要安装 Hadoop,以便 Flume 可以使用 Hadoop jar 与 HDFS 集群进行通信。请注意, 需要使用支持 sync() 调用的 Hadoop 版本。
以下是支持的转义序列:
转义序列 描述
%{host} Event header 中 key 为 host 的值。这个 host 可以是任意的 key,只要 header 中有就能读取,比如 %{aabc} 将读取 header 中 key 为 aabc 的值
%t 毫秒值的时间戳(同 System.currentTimeMillis() 方法)
%a 星期的缩写(Mon、Tue 等)
%A 星期的全拼(Monday、 Tuesday 等)
%b 月份的缩写(Jan、 Feb 等)
%B 月份的全拼(January、February 等)
%c 日期和时间(Mon Feb 14 23:05:25 2022)
%d 月份中的某一天(00到31)
%e 没有填充的月份中的某一天(1到31)
%D 日期,与%m/%d/%y相同 ,例如:02/14/22
%H 小时(00到23)
%I 小时(01到12)
%j 一年中的一天(001到366)
%k 小时(0到23),注意跟 %H的区别
%m 月份(01到12)
%n 没有填充的月份(1到12)
%M 分钟(00到59)
%p am或者pm
%s unix时间戳,是秒值。比如 2022-02-14 18:15:49 的 unix 时间戳是:1644833749
%S 秒(00到59)
%y 一年中的最后两位数(00到99),比如1998年的%y就是98
%Y 年份(2010这种格式)
%z +hhmm,数字时区(比如:-0400)
%[localhost] Agent 实例所在主机的 hostname
%[IP] Agent 实例所在主机的 IP 地址
%[FQDN] Agent 实例所在主机的规范 hostname
注意,%[localhost]、%[IP] 和 %[FQDN] 这三个转义序列实际上都是用 Java 的 API 来获取的,在某些网络环境下可能会获取失败。
正在打开的文件会在名称末尾加上“.tmp”的后缀。文件关闭后,会自动删除此扩展名。这样容易排除目录中的那些已完成的文件。必需的参数已用粗体标明:
属性名称 默认值 描述
channel – 与 Sink 绑定的 Channel
type – 组件类型名称,需要是 hdfs
hdfs.path – HDFS 目录路径(例如:hdfs://namenode/flume/hivedata/)
hdfs.filePrefix FlumeData Flume 在 HDFS 文件夹下创建新文件的固定前缀
hdfs.fileSuffix – Flume 在 HDFS 文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefix – Flume 正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix .tmp Flume 正在写入的临时文件后缀
hdfs.emptyInUseSuffix false 如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效,并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略,写文件时不会带任何后缀
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入 Event 达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout 0 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize 100 在将文件刷新到 HDFS 之前写入文件的 Event 数
hdfs.codeC – 压缩编解码器。可选值:gzip 、 bzip2 、 lzo 、 lzop 、 snappy
hdfs.fileType SequenceFile 文件格式。目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 DataStream 不会压缩文件,不需要设置hdfs.codeC;CompressedStream 必须设置 hdfs.codeC 参数
hdfs.writeFormat Writable 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件
hdfs.useLocalTimeStamp false 使用日期时间转义序列时是否使用本地时间戳(而不是使用 Event headers 中的时间戳)
serializer TEXT Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名
serializer.* 根据上面 serializer 配置的类型来根据需要添加序列化器的参数
三、Flume Channel 配置讲解
Memory Channel 配置讲解
Memory Channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。必需的参数已用粗体标明:
属性名称 默认值 说明
type – 组件类型名称,需要是 memory
capacity 100 存储在 Channel 中的最大 Event 数
transactionCapacity 100 Channel 将从 Source 接收或向 Sink 传递的每一个事务中的最大 Event 数(capacity>= transactionCapacity)
要求:配置 NetCat TCP Source 监控端口 4141 数据流,配置 Avro Sink 输出数据到下一级 Flume。
flume1_netcat_avro
-
创建 Agent 配置文件(NetCat TCP Source + Memory Channel + Avro Sink)
进入 /root/software/apache-flume-1.9.0-bin/conf 目录,使用 touch 命令创建一个名为 flume1_netcat_avro.conf 的配置文件。 -
查看端口
使用 netstat -nlp | grep 4141 命令查看 4141、6666 和 6667 端口是否被占用。 -
编辑配置文件 flume1_netcat_avro.conf
使用 vim 命令打开 flume1_netcat_avro.conf 文件,在里面添加如下配置:
(1)配置 Flume Agent——a1
首先,我们需要为 a1 命名/列出组件,将数据源(Source)、缓冲通道(Channel)和接收器(Sink)分别命名为 netcatSource、 mc1 、 mc2 、 avroSink1 和 avroSink2。
(2)描述和配置 NetCat TCP Source
NetCat TCP Source 侦听给定端口并将每一行文本转换为一个事件(Event),也就是数据是基于换行符分隔。它的工作就像命令 nc -k -l [host] [port]。换句话说,它打开一个指定端口并监听数据。期望提供的数据是换行符分隔的文本。每一行文本都会变成一个 Flume 事件,并通过连接的通道(Channel)发送。
(3)描述和配置 Avro Sink
Avro Sink 可以作为 Flume 分层收集特性的下半部分。发送到此 Sink 的 Flume Event 将转换为 Avro Event 发送到配置的主机/端口上。Event 从配置的 Channel 中批量获取,数量根据配置的 batch-size 而定。
(4)描述和配置 Memory Channel
Memory Channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。
vim flume1_netcat_avro.conf
a1.sources = netcatSource
a1.channels = mc1 mc2
a1.sinks = avroSink1 avroSink2
a1.sources.netcatSource.channels = mc1 mc2
a1.sources.netcatSource.type = netcat
a1.sources.netcatSource.bind = localhost
a1.sources.netcatSource.port = 4141
a1.sources.netcatSource.selector.type = replicating
a1.sinks.avroSink1.channel = mc1
a1.sinks.avroSink1.type = avro
a1.sinks.avroSink1.hostname = localhost
a1.sinks.avroSink1.port = 6666
a1.sinks.avroSink2.channel = mc2
a1.sinks.avroSink2.type = avro
a1.sinks.avroSink2.hostname = localhost
a1.sinks.avroSink2.port = 6667
a1.channels.mc1.type = memory
a1.channels.mc1.capacity = 1000
a1.channels.mc1.transactionCapacity = 100
a1.channels.mc2.type = memory
a1.channels.mc2.capacity = 1000
a1.channels.mc2.transactionCapacity = 100
vim flume2_avro_file.conf
a2.sources = avroSource
a2.channels = memoryChannel
a2.sinks = fileSink
a2.sources.avroSource.channels = memoryChannel
a2.sources.avroSource.type = avro
a2.sources.avroSource.bind = localhost
a2.sources.avroSource.port = 6666
a2.sinks.fileSink.channel = memoryChannel
a2.sinks.fileSink.type = file_roll
a2.sinks.fileSink.sink.directory = /root/flume/avro_file
a2.channels.memoryChannel.type = memory
a2.channels.memoryChannel.capacity = 1000
a2.channels.memoryChannel.transactionCapacity = 100
vim flume3_avro_hdfs.conf
a3.sources = avroSource
a3.channels = memoryChannel
a3.sinks = HDFSSink
a3.sources.avroSource.channels = memoryChannel
a3.sources.avroSource.type = avro
a3.sources.avroSource.bind = localhost
a3.sources.avroSource.port = 6667
## Describe the sink
#与 Sink绑定的 Channel
a3.sinks.HDFSSink.channel = memoryChannel
# 接收器的类型为 hdfs类型,输出目的地是HDFS
a3.sinks.HDFSSink.type = hdfs
# 数据存放在HDFS上的目录
a3.sinks.HDFSSink.hdfs.path = hdfs://localhost:9000/flumedata/%Y-%m-%d
# 文件的固定前缀为 hivelogs-
a3.sinks.HDFSSink.hdfs.filePrefix = netcat
# 按时间间隔滚动文件,默认30s,此处设置为 60s
a3.sinks.HDFSSink.hdfs.rollInterval = 60
# 按文件大小滚动文件,默认1024字节,此处设置为5242880字节 ( 5M)
a3.sinks.HDFsSink.hdfs.rollSize = 134217728
# 当Event个数达到该数量时,将临时文件滚动成目标文件,默认是10,0表示文件的滚动与Event数量无关
a3.sinks.HDFSSink.hdfs.rollCount =0
# 文件格式,默认为SequenceFile,但里面的内容无法直接打开浏览,所以此处设置为DataStream
a3.sinks.HDFSSink.hdfs.fileType = DataStream
# 文件写入格式,默认为Writable,此处设置为Text
a3.sinks.HDFSSink.hdfs.writeFormat = Text
# HDFS Sink是否使用本地时间,默认为false,此处设置为true
a3.sinks.HDFSSink.hdfs.useLocalTimeStamp = true
a3.channels.memoryChannel.type = memory
a3.channels.memoryChannel.capacity = 1000
a3.channels.memoryChannel.transactionCapacity = 100
flume-ng agent -c conf/ -f conf/flume3_avro_hdfs.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c conf/ -f conf/flume2_avro_file.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c conf/ -f conf/flume1_netcat_avro.conf -n a1 -Dflume.root.logger=INFO,console