利用Flink CDC和Flink SQL构建实时数仓Flink写入Doris

时间:2024-11-09 10:28:52

软件环境 Flink1.13.3

Scala 2.12

doris 0.14

一、MySQL 开启binlog日志、创建用户

1.开启bin log

MySQL 8.0默认开启了binlog,可以通过代码show variables like "%log_bin%";查询是否开启了,show variables like "%server_id%";查询服务器ID。

上图分别显示了bin long是否开启以及bin log所在的位置。

2.创建用户

CREATE USER 'flinktest' IDENTIFIED BY '123456'; 

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinktest';

如果遇到报错:

Your password does not satisfy the current policy requirements

Mysql8版本输入

set global validate_password.policy=0; set global validate_password.length=6;

如果是mysql5.6版本 set global validate_password_policy=LOW;set global validate_password_length=6;

二、添加依赖

仓库服务或者这里下载 cdc依赖flink-connector-mysql-cdc-2.0. 添加到$FLINK_HOME/lib下面

这里一定要注意一下cdc和flink版本的匹配关系,否则执行SQL的时候会报错:

[ERROR] Could not execute SQL statement. Reason:
: $()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

具体如下表:

Flink CDC Connector Version Flink Version
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*
2.0.* 1.13.*

三、建表

建表:

CREATE TABLE flink_test(id INT ,name VARCHAR(20));

建表

2.1启动doris

不懂如何启动可以看这里

2.2Flink连接doris驱动

Flink连接doris需要flink-doris-connector包,如果你懒得编译,可以从这边下载,下面的编译步骤就免了。

驱动编译过程:

首先到Doris官网把整个项目下载下来,然后解压

unzip 
cd incubator-doris-master/extension/flink-doris-connector
./  

 如果遇到报错./: Permission denied  那就修改权限 chmod 777

如果遇到报错./: line 43: mvn: command not found
Error: mvn is not found 那就安装一下maven可以看到这里

等到N久之后,然后你可能遇到报错,无力吐槽啊:

[ERROR] thrift failed output:
[ERROR] thrift failed error: /bin/sh: /opt/pkg/incubator-doris-master/extension/flink-doris-connector/../../thirdparty/installed/bin/thrift: No such file or directory
[INFO] BUILD FAILURE
[ERROR] Failed to execute goal :maven-thrift-plugin:0.1.11:compile (thrift-sources) on project doris-flink: thrift did not exit cleanly. Review output for more information. -> [Help 1]

好吧,那就安装thrift咯。安装过程中可能有报C++错误configure: No compiler with C++11 support was found,那就yum install -y gcc gcc-c++安装一下

#版本别太新哈0.93就行,不然可能报错
1.下载
wget /apache/thrift/0.9.3/thrift-0.9.
或者wget /dist/thrift/0.9.3/
2.解压编译
tar -zxf thrift-0.9.
cd thrift-0.9.3
./configure --with-lua=no && make && make install
3.验证
thrift -version
4.把thrift复制到thirdparty/installed/bin 目录下,目录如果不存在需要手工创建
cp /usr/local/bin/thrift /opt/pkg/incubator-doris-master/thirdparty/installed/bin

又等待N久,继续执行./

注意,默认flink版本是1.12版本,如果是1.13版本,需要修改incubator-doris-master/extension/flink-doris-connector下面的把property修改一下

虽然短短几行代码,但是踩坑了不少,等待时间又很久,如果有人不想编译,可以到这边下载我编译好的。注意我这个flink版本是1.13.3,scala版本是2.12哈。

2.3 Doris建表

 mysql -h 172.16.37.29 -P 9030 -uroot

CREATE TABLE test_cnt
(
    id int,
    name varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2");

 3.启动flink并建表

3.1启动fink

在$FLINK_HOME/bin目录输入./

3.2 flink建表

输入./ embedded启动FLINK SQL客户端,FLINK SQL有表模式,日志变更模式和Tableau模式,本次采用表模式,所以启动之后输入 SET -mode=table;
创建mysql source:

CREATE TABLE flink_mysql_src(
 id INT NOT NULL,
 name STRING
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '',
 'port' = '3306',
 'username' = 'xxxx',
 'password' = 'xxx',
 'database-name' = 'xx',
 'table-name' = 'flink_test',
 '' = 'false'
);

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'' = 'false'否则会报错:

[ERROR] Could not execute SQL statement. Reason:
: The primary key is necessary when enable 'Key: '' , default: true (fallback keys: [])' to 'true'

看一下是否能监控到MySQL数据:

在mysql中输入
insert into flink_test values(1,'a');
insert into flink_test values(2,'b');
insert into flink_test values(3,'c');
在flink sql输入:
select * from flink_mysql_src;

可以看到结果已经输出到flink控制台了,说明flink到mysql这端数据传输是OK的

如果遇到报错:ClassNotFoundException: 那就把flink-connector-debezium-2.0.也放到lib目录下面

创建doris sink:

CREATE TABLE flink_doris_sink (
    id int,
    name string
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = 'localhost:8030',
      '' = 'zh.test_cnt',
      ''='2',
      'username' = 'root',
      'password'=''
);

select * from flink_doris_sink看看有没有报错。 

如果报错[ERROR] Could not execute SQL statement. Reason:
: can not fetch partitions 说明数据库不存在或者表不存在,注意看建表语句。

如果报错[ERROR] Could not execute SQL statement. Reason: : 说明doris-flink-1.编译有问题,看看自己版本对不对,不对重新改一下pom重新编译

四、实践

执行任务

INSERT INTO flink_doris_sink
SELECT id,name
FROM flink_mysql_src;

可以到flink网页端看到任务的情况了

2.往mysql插入数据

insert into flink_test values(1,'a');
在doris 中查询,发现数据已经过来了

3.变更数据

在mysql中执行update flink_test  set name='tests' where id=1

在doris中查询发现数据已经变更了,不过变成了两条记录,flink_doris_connector暂时不支持删除,据说后续版本会更新,那就期待一下吧。