1. exec source
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 ;
常用的是tail -F file指令监控一个文件,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。
可用此方式进行实时抽取。
配置如下:
Source:exec
Sink:logger
Channel:memory
# exec source
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#定义source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /root/exec.log
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
创建监控文件
touch exec.log && echo "OK"
启动flume agent a1 服务端
flume-ng agent -n a1 -c /opt/module/apache-flume-1.9.0-bin/conf/ -f /opt/module/apache-flume-1.9.0-bin/agent/exec.agnet -Dflume.root.logger=INFO,console
向exec.log文件中添加数据
/opt/module/apache-flume-1.9.0-bin/agent
[hexuan@hadoop106 agent]$ echo "NB" >> exec.log
[hexuan@hadoop106 agent]$ echo "NB" >> exec.log
[hexuan@hadoop106 agent]$ echo "NB" >> exec.log
[hexuan@hadoop106 agent]$
观察结果
2024-11-15 21:37:22,106 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4E 42 NB }
2024-11-15 21:37:26,923 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4E 42 NB }
2024-11-15 21:37:26,924 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 4E 42 NB }
2. Taildir Source
Taildir Source:监听一个指定的目录下,指定正则格式的文件的内容,作为它的数据源,并支持断点续传功能 ;
如何支持断点续传的?
有个文件,存储断点续传的位置。
用于实时抽取指定目录下的多个文件。
创建配置文件taildir.agent
监控example.log
配置方式:
Source:Taildir
Sink:logger
Channel:memory
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=TAILDIR
a1.sources.r1.positionFile =/opt/module/apache-flume-1.9.0-bin/agent/tail_position.json
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/module/apache-flume-1.9.0-bin/agent/example.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
当追加数据到log文件中:
[hexuan@hadoop106 agent]$ echo 'NB hexuan is vary NB' >> /opt/module/apache-flume-1.9.0-bin/agent/example.log
[hexuan@hadoop106 agent]$ echo 'NB hexuan is vary NB' >> /opt/module/apache-flume-1.9.0-bin/agent/example.log
[hexuan@hadoop106 agent]$ echo 'NB hexuan is vary NB' >> /opt/module/apache-flume-1.9.0-bin/agent/example.log
查看tail_position.json
此时关闭flume的agent。接着向example.log写入数据
重启flume会从上一次收集处接着收集
新的记录位置有更新
3. Spooling Directory Source
Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件.
此种方式不是实时抽取,是定时抽取。
flume官网中Spooling Directory Source描述
Property Name Default Description
channels –
type – The component type name, needs to be spooldir.
spoolDir – Spooling Directory Source监听的目录
fileSuffix .COMPLETED 文件内容写入到channel之后,标记该文件
deletePolicy never 文件内容写入到channel之后的删除策略: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
ignorePattern ^$ Regular expression specifying which files to ignore (skip)
interceptors – 指定传输中event的head(头信息),常用timestamp
a1.sources.r1.ignorePattern = ^(.)*\.tmp$ # 跳过.tmp结尾的文件
两个注意事项:
# 1) 拷贝到spool目录下的文件不可以再打开编辑
# 2) 不能将具有相同文件名字的文件拷贝到这个目录下
配置如下:
Source:Spooling Directory
Sink:logger
Channel:memory
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/spool
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
其中:
Spooling Directory Source 监听/root/spool 下的是否有新文件,如果有,则读到channel。输出到控制台上。
创建监控目录
启动flume agent a1 服务端
监控/root/spool 目录,把文件cp到目录下,flume就开始归集,归集完,把文件重命名为xxx.COMPLETED
cp文件到目标目录(文件不重名)
已经被归集的文件,被重命名
4. HTTP Source
用来接收http协议通过get或者post请求发送过来的数据,一般get用于测试,常用的是接收post请求发送过来的数据。
配置如下:
Source:http
Sink:logger
Channel:memory
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#定义source
a1.sources.r1.type=http
a1.sources.r1.bind=11.90.214.80
a1.sources.r1.port=8787
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume agent a1 服务端
发送http请求,并携带请求数据
curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' http://11.90.214.80:8787