架构版本:FLink1.13.1+FlinkCDC2.0.2+Hudi0.10
构建hudi
(1)通过国内镜像拉取源码
git clone /apache/
(2)修改
vim
-- 直接添加
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
(3)构建
mvn clean package -DskipTests -Dspark3 -Dscala-2.1
(4)编译好之后文件目录对应Hudi下的packaging目录
FLink 操作
sql-client操作
-- 1、下载flink1.13.1
-- 2、添加hadoop环境变量
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
-- 3、启动flink集群
./start-cluster.sh 或 ./yarn-session.sh
-- 4、启动flink sql client,并关联编译好的hudi依赖包
bin/sql-client.sh embedded -j $hudi_home/packaging/hudi-flink-bundle/target/hudi-flink-bundle***.jar
Flink Sql Client操作
查询数据
set -mode=tableau;
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'schema://base-path',
'' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','F*',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
-- query from the Hudi table
select * from t1;
-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
流式查询#
CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///hudi/t2',
'' = 'MERGE_ON_READ',
'' = 'true', -- this option enable the streaming read
'-commit' = '20210927134557' -- specifies the start commit instant time
'-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
-- Then query the table in stream mode
select * from t1;
IDEA操作
- 引入下面依赖
-- 1、自己将构建好的下载好的 jar包 按下面方式做成maven依赖,hudi-flink-bundle在hudi目录下
-- 2、cdc地址:/ververica/flink-cdc-connectors/releases/tag/release-2.0.2
-- 3、mvn install:install-file -DgroupId= -DartifactId=flink-connector-mysql-cdc -Dversion=2.0.2-SNAPSHOT -Dpackaging=jar -Dfile=flink-sql-connector-mysql-cdc-2.0.
<dependency>
<groupId></groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>hudi-flink-bundle</artifactId>
<version>0.10.0-SNAPSHOT</version>
</dependency>
2、写java程序
package com.bighao.SQL.Hudi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CDCToHudi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 记得开启Checkpoint,不然数据量不够是不会往hoodi写的
env.setParallelism(1).enableCheckpointing(10000);
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sourceDDL = "CREATE TABLE t1 (" +
" uuid VARCHAR(20) PRIMARY KEY," +
" name VARCHAR(10)," +
" age INT, " +
" ts TIMESTAMP(3), " +
" par VARCHAR(20) " +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'mysql所在主机'," +
" 'port' = '3306'," +
" 'username' = 'name'," +
" 'password' = '***'," +
" 'database-name' = 'test'," +
" 'table-name' = 't1'," +
" '' = 'initial'" +
")";
String sinkDDL ="CREATE TABLE t3( " +
"uuid VARCHAR(20), " +
"name VARCHAR(10), " +
"age INT, " +
"ts TIMESTAMP(3), " +
"`partition` VARCHAR(20)) " +
"PARTITIONED BY (`partition`) " +
"WITH ( " +
"'connector' = 'hudi', " +
"'path' = 'hdfs://ip:9820/flink-hudi/t3', " +
"'' = 'MERGE_ON_READ', " +
"'' = 'true' ," +
"'-interval' = '4')";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
tableEnv.executeSql("INSERT INTO t3(uuid,name,age,ts, `partition`) SELECT uuid,name,age,ts, par FROM t1");
env.execute("read_hudi");
}
}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class ReadHoodi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sourceDDL ="CREATE TABLE t3( " +
"uuid VARCHAR(20), " +
"name VARCHAR(10), " +
"age INT, " +
"ts TIMESTAMP(3), " +
"`partition` VARCHAR(20)) " +
"PARTITIONED BY (`partition`) " +
"WITH ( " +
"'connector' = 'hudi', " +
"'path' = 'hdfs://ip:9820/hudi/t3', " +
"'' = 'MERGE_ON_READ', " +
"'' = 'true' ," +
"'-interval' = '4')";
tableEnv.executeSql(sourceDDL);
TableResult result2 = tableEnv.executeSql("SELECT * FROM t3");
result2.print();
env.execute("read_hudi1");
}
}
3、往mysql表中插入数据,修改数据,查看ReadHoodi打印的数据
[1] MySQL CDC 文档:
/flink-cdc-connectors/master/content/connectors/
[2] Hudi Flink 答疑解惑:
/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#
[3] Hudi 的一些设计:
/docs/share/5d1c383d-c3fc-483a-ad7e-d8181d6295cd?#