【大数据学习 | flume】flume之常见的source组件

时间:2024-11-16 08:16:41

1. exec source

Exec Source:监听一个指定的命令获取一条命令的结果作为它的数据源 ;

常用的是tail -F file指令监控一个文件,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。

可用此方式进行实时抽取。

配置如下:

Source:exec

Sink:logger

Channel:memory

# exec source
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /root/exec.log

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

创建监控文件

touch exec.log && echo "OK"

启动flume agent a1 服务端

flume-ng agent -n a1 -c /opt/module/apache-flume-1.9.0-bin/conf/ -f /opt/module/apache-flume-1.9.0-bin/agent/exec.agnet -Dflume.root.logger=INFO,console

向exec.log文件中添加数据

/opt/module/apache-flume-1.9.0-bin/agent
[hexuan@hadoop106 agent]$ echo "NB" >> exec.log 
[hexuan@hadoop106 agent]$ echo "NB" >> exec.log 
[hexuan@hadoop106 agent]$ echo "NB" >> exec.log 
[hexuan@hadoop106 agent]$ 

观察结果

2024-11-15 21:37:22,106 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4E 42                                           NB }
2024-11-15 21:37:26,923 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4E 42                                           NB }
2024-11-15 21:37:26,924 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4E 42                                           NB }

2. Taildir Source

Taildir Source:监听一个指定的目录下,指定正则格式的文件的内容,作为它的数据源,并支持断点续传功能 ;

如何支持断点续传的?

有个文件,存储断点续传的位置。

用于实时抽取指定目录下的多个文件。

创建配置文件taildir.agent

监控example.log

配置方式:

Source:Taildir

Sink:logger

Channel:memory

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=TAILDIR
a1.sources.r1.positionFile =/opt/module/apache-flume-1.9.0-bin/agent/tail_position.json
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/module/apache-flume-1.9.0-bin/agent/example.log

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger


a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

当追加数据到log文件中:

[hexuan@hadoop106 agent]$ echo 'NB hexuan is vary NB' >> /opt/module/apache-flume-1.9.0-bin/agent/example.log 
[hexuan@hadoop106 agent]$ echo 'NB hexuan is vary NB' >> /opt/module/apache-flume-1.9.0-bin/agent/example.log 
[hexuan@hadoop106 agent]$ echo 'NB hexuan is vary NB' >> /opt/module/apache-flume-1.9.0-bin/agent/example.log 

查看tail_position.json

此时关闭flume的agent。接着向example.log写入数据

重启flume会从上一次收集处接着收集

新的记录位置有更新

3. Spooling Directory Source

Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件.

​ 此种方式不是实时抽取,是定时抽取。

flume官网中Spooling Directory Source描述

Property Name      Default      Description
channels              –  
type                  –          The component type name, needs to be spooldir.
spoolDir              –          Spooling Directory Source监听的目录
fileSuffix        .COMPLETED    文件内容写入到channel之后,标记该文件
deletePolicy      never        文件内容写入到channel之后的删除策略: never or immediate
fileHeader        false        Whether to add a header storing the absolute path filename.
ignorePattern      ^$          Regular expression specifying which files to ignore (skip)
interceptors          –          指定传输中event的head(头信息),常用timestamp

a1.sources.r1.ignorePattern = ^(.)*\.tmp$ # 跳过.tmp结尾的文件

两个注意事项:

# 1) 拷贝到spool目录下的文件不可以再打开编辑

# 2) 不能将具有相同文件名字的文件拷贝到这个目录下

配置如下:

Source:Spooling Directory

Sink:logger

Channel:memory

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/spool

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

其中:

​ Spooling Directory Source 监听/root/spool 下的是否有新文件,如果有,则读到channel。输出到控制台上。

创建监控目录

启动flume agent a1 服务端

监控/root/spool 目录,把文件cp到目录下,flume就开始归集,归集完,把文件重命名为xxx.COMPLETED

cp文件到目标目录(文件不重名)

已经被归集的文件,被重命名

4. HTTP Source

用来接收http协议通过get或者post请求发送过来的数据,一般get用于测试,常用的是接收post请求发送过来的数据。

配置如下:

Source:http

Sink:logger

Channel:memory

a1.sources=r1
a1.sinks=k1
a1.channels=c1 

#定义source
a1.sources.r1.type=http
a1.sources.r1.bind=11.90.214.80
a1.sources.r1.port=8787

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume agent a1 服务端

发送http请求,并携带请求数据

curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://11.90.214.80:8787