1、上传jar包
# 1、上传jar包到flink的lib目录
flink-connector-jdbc-1.15.4.jar
mysql-connector-java-5.1.49.jar
# 2、重启集群
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d
# 3、重新进入sql命令行
sql-client.sh
2、Mysql Source
-- 有界流
CREATE TABLE students_jdbc (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/Test',
'table-name' = 'students',
'username' ='root',
'password' ='123456'
);
select * from students_jdbc;
3、Mysql Sink
-- sink 表
CREATE TABLE clazz_num_mysql (
clazz STRING,
num BIGINT,
PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键进行更新
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/Test',
'table-name' = 'clazz_num_mysql',
'username' ='root',
'password' ='123456'
);
-- mysql建表
CREATE TABLE clazz_num_mysql (
clazz varchar(255),
num BIGINT,
PRIMARY KEY (clazz) -- 按照主键进行更新
);
-- 将查询结果保存到mysql
insert into clazz_num_mysql
select clazz,count(1) as num
from
students_text
where clazz is not null
group by clazz;