数据采集 ETL 工具 bboss-datatran v6.7.7 发布,支持 Elasticsearch 8 以及其他 Elasticsearch 低版本和 Opensearch 之间数据同步。
- 新增轻量级但功能强大的大数据指标分析计算模块,可以非常方便地实现基于时间窗口的多种维度的实时指标计算和离线指标计算功能,适用于有限维度指标key和无限维度指标key,同时可以非常方便地将指标分析计算结果存储到各种数据库,以极低成本快速构建企业级大数据分析应用,导入以下包即可:
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-datatran-metrics</artifactId>
<version>6.7.7</version>
</dependency>
使用案例
-
增量采集状态表increament_tab结构调整,增加以下字段:
jobType varchar(500),用于保存输入插件类型,不能为空,不同的插件类型对应的值见下面的表格说明,避免不同类型作业加载增量状态时,互相干扰;
jobId varchar(500),用于保存外部设置的作业id,可为空,相同的进程内启动多个同类型的输入插件的作业时,必须指定每个作业的jobId,避免各个作业加载增量状态时,互相干扰
插件类型 | jobType | jobId |
---|---|---|
HttpInputDataTranPlugin | HttpInputDataTranPlugin | 空 |
DBInputDataTranPlugin | DBInputDataTranPlugin | 空 |
ElasticsearchInputDataTranPlugin | ElasticsearchInputDataTranPlugin | 空 |
FileInputDataTranPlugin | FileInputDataTranPlugin | 空 |
HBaseInputDatatranPlugin | HBaseInputDatatranPlugin | 空 |
Kafka2InputDatatranPlugin | Kafka2InputDatatranPlugin | 空 |
MongoDBInputDatatranPlugin | MongoDBInputDatatranPlugin | 空 |
升级注意事项:升级6.7.7前,需要手动增加jobType和jobId两个字段,并修改increament_tab表中的状态记录,根据作业输入插件类型,填写正确的jobType,然后再启动作业,这样作业才能继续正常工作。
- 优化kafka输出插件任务状态记录管理功能,采用指标分析Metrics对数据发送情况进行聚合统计,按照指定的时间窗口进行聚合计算后,执行回调任务处理success方法,任务taskMetrics为聚合计算后的统计信息,可以通过开关控制是否进行预聚合功能:
kafkaOutputConfig.setEnableMetricsAgg(true);//启用预聚合功能
kafkaOutputConfig.setMetricsAggWindow(60);//指定统计时间窗口,单位:秒,默认值60秒
4. 优化kafka输入插件拦截器功能:定时记录统计插件消费kafka数据记录情况,并调用任务拦截器的aftercall方法输出统计jobMetrics信息,可以指定统计时间间隔:
kafka2InputConfig.setMetricsInterval(300 * 1000L);//300秒做一次任务拦截调用,默认值
5. 部分插件增加字段映射功能,涉及插件:日志采集插件、excel采集插件、生成日志/excel文件插件、kafka输入插件
文件采集插件字段映射配置示例:
FileInputConfig fileInputConfig = new FileInputConfig();
_fileInputConfig = fileInputConfig;
FileConfig fileConfig = new FileConfig();
fileConfig.setFieldSplit(";");//指定日志记录字段分割符
//指定字段映射配置
fileConfig.addDateCellMapping(0, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
fileConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
fileConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());
kafka映射配置示例:
Kafka2InputConfig kafka2InputConfig = new Kafka2InputConfig();
kafka2InputConfig.setFieldSplit(";");//指定kafka记录字段分割符
//指定字段映射配置
kafka2InputConfig.addDateCellMapping(0, //记录切割得到的字段列表位置索引,从0开始
excelCellMapping.getFieldName(), //映射的字段名称
cellType, //字段值类型
excelCellMapping.getDefaultValue(), //字段默认值
excelCellMapping.getDataFormat());//字段格式:日期格式或者数字格式
kafka2InputConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
kafka2InputConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());
cellType取值范围:
public static final int CELL_BOOLEAN = 5;
public static final int CELL_DATE = 3;
public static final int CELL_NUMBER = 2;
public static final int CELL_NUMBER_INTEGER = 6;
public static final int CELL_NUMBER_LONG = 7;
public static final int CELL_NUMBER_FLOAT = 8;
public static final int CELL_NUMBER_SHORT = 9;
public static final int CELL_STRING = 1;
6. 增加全局JobContext,用于存放在作业中使用的初始化数据
7. 作业拦截器、任务拦截器异常方法参数Exception类型调整为Throwable类型
更多变更,参考提交记录:https://gitee.com/bboss/bboss-elastic-tran/commits/master
数据同步作业开发视频教程
https://www.bilibili.com/video/BV1xf4y1Z7xu
bboss 案例大全
https://esdoc.bbossgroups.com/#/bboss-datasyn-demo
Quick Start
https://esdoc.bbossgroups.com/#/quickstart
开发交流
https://www.bbossgroups.com/forum.html