Flume 自定义source -- SQLSource (转为 json 格式)

时间:2025-04-13 07:36:06

 

个人使用flume  相对较多 ,对他的采集任务比较喜欢 ,自己做了一些数据库方向的的拓展。

虽然 github  上 有很多 自定义的 flume  sql-source  比如 大名鼎鼎 的  /keedio/flume-ng-sql-source

但是 我个人在使用的过程中遇到了一些问题   也一直没有得到解决 ,/keedio/flume-ng-sql-source/issues/59 

并且 格式 是csv  格式  。

所以我自己写了一套 sqlsource  基于原生的jdbc  连接 ,没有使用 其他的框架  ,读取数据库 转为 json  格式  。也便于解析 个人 项目 地址 ,欢迎提出不足 ,有问题 直接 提 issues。我会尽快帮大家解决 。

一、
在使用flume采集日志时,可以通过flume进行监控某一个文件把生产的数据传输给指定的sink,但是如果某段时间flume所在机器宕机了,那么当重新启动后,在去监控时,会导致有数据丢失,不是接着上一次的数据继续进行读取,因此针对这种情况时可能需要我们自定义一个source,记录偏移量,每次都是接着上次继续读,记录 数据已经发送的位置  。
二、
下面就是具体实现的代码
再写代码时可以参照官方给的source的源码进行编写.
flume的生命周期: 先执行构造器,再执行 config方法 --> start方法 --> –> stop

读取配置文件 ->  初始化 数据库连接及 相关参数   ->  解析 sql  ->  获取 resultsSet  ->  转为 Json ->  发送个channel  -> stop -> 记录当前位置 

具体 详见 /HbnKing/Flume-ng-Database/blob/master/RDB/src/main/java/com/hbn/rdb/source/

三 、

配置文件 

这里采用 loggersink    主要看一下Source  的 一些相关 配置 

支持 自定义 sql   

 = r1
 = c1
 = k1

# 这里用 自己定义的 SQLSource
. = 
. = jdbc:oracle:thin:@//ip:1521/orcl
. = yyj
. = yyj
.= 
. = /var/log/sqllog
. = 
#. = select ,a.COUPON_id, from USER_COUPON_CODE_1 a ,COUPON_CODE b  where a.COUPON_id = 
#. = select ,a.COUPON_id, from USER_COUPON_CODE_1 a ,COUPON_CODE b  where a.COUPON_id =   and   > $@$
. = select * from USER_COUPON_CODE_1
. = 0
. = 
. = 1000
#具体定义channel
. = memory
. = 1000
. = 100
#具体定义sink
. = logger
#组装source、channel、sink
. = c1
. = c1

四  、  

启动命令 

bin/flume-ng agent --conf conf/ --conf-file conf/  --name f2 -=INFO,console

 

五  、  

查看结果 


{  "ID" : NumberLong("10000900000"), "COUPON_ID" : 900000, "ID_3" : NumberLong("10000900000") }
{ "ID" : NumberLong("10000700000"), "COUPON_ID" : 700000, "ID_3" : NumberLong("10000700000") }
{  "ID" : NumberLong("10000300000"), "COUPON_ID" : 300000, "ID_3" : NumberLong("10000300000") }
{ "ID" : NumberLong("10000800000"), "COUPON_ID" : 800000, "ID_3" : NumberLong("10000800000") }
{ "ID" : NumberLong("10000500001"), "COUPON_ID" : 500001, "ID_3" : NumberLong("10000500001") }
{  "ID" : NumberLong("10000500002"), "COUPON_ID" : 500002, "ID_3" : NumberLong("10000500002") }
{  "ID" : NumberLong("10000500003"), "COUPON_ID" : 500003, "ID_3" : NumberLong("10000500003") }

六 、

其他   

如有问题 ,请留言 ,欢迎提出不足 和 建议 。

感谢贡献 。