采用Flink CDC操作SQL Server数据库获取增量变更数据

时间:2024-04-17 07:14:46

采用Flink CDC操作SQL Server数据库获取增量变更数据

Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalogSqlServerTable。在SqlServerCatalog中,你可以根据表名获取对应的字段和字段类型。

SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。

1、准备工作

软件版本

Flink 1.17.1

数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)

1.1、数据库准备 启动CDC

--  开启SQL Server数据库CDC。  在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases 

1.2、开启SQL Server代理

打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启

在这里插入图片描述

1.3、为需要跟踪更改的表启用 CDC。

-- 开启表级别的CDC   --需要开启先SQL Server代理  然后执行 
 EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'AIR_STATION_HOUR_DATA', -- table_name
        @capture_instance = NULL, -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL -- role_name
 --  验证表是否开启cdc成功
 EXEC sys.sp_cdc_help_change_data_capture

2、代码编写

2.1、引入依赖

    <properties>
        <flink.version>1.17.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>9.4.1.jre8</version>
        </dependency>        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

2.2、代码编写

2.2.1 、数据库配置文件编写

public class SQLServerConstant {
    public static final String SQLSERVER_HOST = "0.0.0.0";  //数据库地址
    public static final Integer SQLSERVER_PORT = 1433; //端口
 
    public static final String SQLSERVER_DATABASE = "HBDC_AQI";  //库

    public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表
    public static final String SQLSERVER_USER_NAME = "sa"; //用户
    public static final String SQLSERVER_PASSWORD = "*******"; //密码
}

2.2.2 CDC数据实体类

@Data
public class DataChangeInfo implements Serializable {
    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private LocalDateTime changeTime;
    /**
     * 变更类型 1新增 2修改 3删除
     */
    private Integer eventType;
    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;

}

2.2.2 、SQLServer消息读取自定义序列化

@Slf4j
public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String CREATE = "CREATE";
    public static final String UPDATE = "UPDATE";


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {
        try {
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
            // 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            ZoneId zone = ZoneId.systemDefault();
            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> 		         Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
            //7.输出数据
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());

        }

    }
    /**
     *
     * 从源数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }



    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {

        return TypeInformation.of(DataChangeInfo.class);
    }
}

2.2.3 、功能工具类

public class FlinkSourceUtil {
   
    /**
     * 构造SQL Server CDC数据源
     */
    public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(",");
        return SqlServerSource.<DataChangeInfo>builder()
                .hostname(SQLSERVER_HOST)
                .port(SQLSERVER_PORT)
                .database(SQLSERVER_DATABASE) // monitor sqlserver database
                .tableList(tables) // monitor products table
                .username(SQLSERVER_USER_NAME)
                .password(SQLSERVER_PASSWORD)
                /*
                 *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 */
                .startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial())
                .deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }

   
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
                .setParallelism(1);
        streamSource.print();

        env.execute("SQLServer-stream-cdc");


    }
}

2.3、运行main方法测试

在这里插入图片描述