FlinkCDC的2.2.0版本怎么监控库中的所有表,增加新表到已有任务?

时间:2025-03-31 07:11:29

FlinkCDC的2.2.0版本怎么监控库中的所有表,增加新表到已有任务?

一、监控全表

​ 千呼万唤始出来,之前预告FlinkCDC的2.2.0支持Flink1.14和添加新表,满怀希望!今天一看略显失望,添加新表,不支持动态添加,需要修改tableList之后,从ck中重启,倒是不用重新写新代码了,但是不满足我们目前的需求,失望之一。

二是,api改得有点随意了。

2.0版本监控全表,tableList不设置就行了

        DebeziumSourceFunction<String> mySQLSource = MySqlSource.<String>builder()
                .hostname((""))
                .port((""))
                .username((""))
                .password((""))
                .databaseList((""))
                //可选配置,如果不指定该参数,则会读取上一个配置下的所有表数据
                //指定的时候需要使用的方式明确指定
				//.tableList("reported2.epidemic_report_entty")
                .startupOptions(())
                .deserializer(new MyDeseriallizationFun())
                .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

到了2.2版本源码中MySqlSourceConfig类对tableList做了校验,不能为null

        = checkNotNull(tableList);
  • 1

对于如何监控所有表,文档中也没有说明。

通过尝试发现,传空串是监控全表,这你。。。行吧,api改得有点随意了,也不考虑版本兼容。。。

正确得写法是

       MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname((""))
                .port((""))
                .databaseList((""))
//                .scanNewlyAddedTableEnabled(true)
                .connectTimeout((60))
                //必选配置,空串的时候为监控全表
                //指定的时候需要使用的方式明确指定
//                .tableList("")
                .tableList("")
                .username((""))
                .password((""))
                .startupOptions(())
                .deserializer(new MyDeseriallizationFun())
                .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

二、增加新表到已有的任务

另外想要加新的表,到已有的任务中,需要设置scanNewlyAddedTableEnabled(true)

1.设置savepoint

$ ./bin/flink stop $Existing_Flink_JOB_ID

Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
  • 1
  • 2
  • 3
  • 4

2.修改tablelist,增加表

 MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .scanNewlyAddedTableEnabled(true) 
        .databaseList("db") 
        .tableList(", , , , ") // set captured tables [product, user, address ,order, custom]
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
   // your business code
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3.从savepoint启动

$ ./bin/flink run \
      --detached \ 
      --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
      ./
  • 1
  • 2
  • 3
  • 4