从clickhouse迁移到StarRocks,研究讨论后,决定使用flink进行kafka同步到StarRocks
1、数据模型
StarRocks 的数据模型主要分为3类:
- Aggregate,聚合模型
- Unique,更新模型
- Duplicate,明细模型
- Primary, 主键模型
因数据都是日志类数据,帮选择了Duplicate
2、分区
StarRocks可以选择固定分区和动态分区,考虑到数据增长,故选按天进行动态分区
3、建表
CREATE TABLE IF NOT EXISTS monitor_log (
`monitor_date` VARCHAR(100) COMMENT "日期",
`employee_code` VARCHAR(100) COMMENT "员工工号",
`host_name` VARCHAR(100) COMMENT "主机名称",
`id` BIGINT(20) NOT NULL COMMENT "",
`computer_model` VARCHAR(200) COMMENT "电脑类型",
`cpu_num` INT,
`create_time` DATETIME COMMENT "日志时间"
)
DUPLICATE KEY(`monitor_date`, `employee_code`, `host_name`)
PARTITION BY RANGE(`create_time`) ( )
DISTRIBUTED BY HASH(`create_time`) BUCKETS 8
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "ml",
"dynamic_partition.buckets" = "8",
"dynamic_partition.history_partition_num" = "0"
)
4、使用mybatis的多数据源框架,配置多数据源进行查询
引包:
<dependency>
<groupId></groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
配置:
-class-name=
=jdbc:mysql://127.0.0.1:9030/test?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai&autoReconnect=true&allowMultiQueries=true&useSSL=false
=root
=1234
在查询的方法前配置注解
@DS("starrocks")
4、写同步数据的flink sql,配置source和sink
连接StarRocks可视化工具:SQLyog