FlinkCDC

时间:2025-04-09 07:00:01
package online.shuihua; import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import io.debezium.data.Envelope; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.Properties; public class Flink_CDCWithCustomerSchema { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.创建 Flink-MySQL-CDC 的 Source Properties properties = new Properties(); //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog. //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started. //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp. //specific-offset: Never to perform snapshot on the monitored database tables upon //first startup, and directly read binlog from the specified offset. DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder() .hostname("hadoop102") .port(3306) .username("root") .password("000000") .databaseList("gmall-flink") .tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用""的方式 .startupOptions(StartupOptions.initial()) .deserializer(new DebeziumDeserializationSchema<String>() { //自定义数据解析器 @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //获取主题信息,包含着数据库和表名mysql_binlog_source.gmall-flink.z_user_info String topic = sourceRecord.topic(); String[] arr = topic.split("\\."); String db = arr[1]; String tableName = arr[2]; //获取操作类型 READ DELETE UPDATE CREATE Envelope.Operation operation = Envelope.operationFor(sourceRecord); //获取值信息并转换为 Struct 类型 Struct value = (Struct) sourceRecord.value(); //获取变化后的数据 Struct after = value.getStruct("after"); //创建 JSON 对象用于存储数据信息 JSONObject data = new JSONObject(); for (Field field : after.schema().fields()) { Object o = after.get(field); data.put(field.name(), o); } //创建 JSON 对象用于封装最终返回值数据信息 JSONObject result = new JSONObject(); result.put("operation", operation.toString().toLowerCase()); result.put("data", data); result.put("database", db); result.put("table", tableName); //发送数据至下游 collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } }) .build(); //3.使用 CDC Source 从 MySQL 读取数据 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource); //4.打印数据 mysqlDS.print(); //5.执行任务 env.execute(); } }