Flink整合Hive、Mysql、Hbase、Kafka-二、Flink整合Mysql

时间:2024-11-14 08:37:23

1、上传jar包

# 1、上传jar包到flink的lib目录
flink-connector-jdbc-1.15.4.jar
mysql-connector-java-5.1.49.jar

# 2、重启集群
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d

# 3、重新进入sql命令行
sql-client.sh

2、Mysql Source

-- 有界流
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/Test',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456'
);
select * from students_jdbc;

3、Mysql Sink

-- sink 表
CREATE TABLE clazz_num_mysql (
    clazz STRING,
    num BIGINT,
    PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键进行更新
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/Test',
    'table-name' = 'clazz_num_mysql',
    'username' ='root',
    'password' ='123456'
);

-- mysql建表
CREATE TABLE clazz_num_mysql (
    clazz varchar(255),
    num BIGINT,
    PRIMARY KEY (clazz) -- 按照主键进行更新
);

-- 将查询结果保存到mysql
insert into clazz_num_mysql
select clazz,count(1) as num
from 
students_text
where clazz is not null
group by clazz;