RocketMQ Flink Catalog 设计与实践

时间:2022-10-26 14:12:15

一、Flink 和 Flink Catalog

Flink 是一个分布式计算引擎,目前已经实现批流一体,可以实现对有界数据和*数据的处理。需要有效分配和管理计算资源才能执行流式应用程序。

目前 Flink API 共抽象为四个部分:

  • 最顶层的抽象为 SQL。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
  • 第二层抽象为 Table API。Table API是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。
  • 第三层抽象是Core APIs 。许多程序可能使用不到最底层的 API而是可以使用Core APIs进行编程:其中包含 DataStream API(应用于有界/*数据流场景)和 DataSet API(应用于有界数据集场景)两部分。
  • 第四层抽象为有状态的实时流处理。

RocketMQ Flink Catalog 设计与实践

Flink Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。Flink 对于元数据的管理分为临时的、持久化的两种。内置的 ​​GenericInMemoryCatalog​​ 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。JdbcCatalog 和 HiveCatalog 就是可以持久化元数据的 Catalog。

Flink Catalog 是扩展的,支持用户自定义。为了在 Flink SQL 中使用自定义 Catalog,用户需要通过实现​​CatalogFactory​​接口来实现对应的 Catalog 工厂。该工厂是使用 Java 的服务提供者接口 (SPI) 发现的。可以将实现此接口的类添加到 ​​META_INF/services/org.apache.flink.table.factories.Factory​​JAR 文件中。

二、RocketMQ Flink Connector

RocketMQ 连接器为 Flink 提供从 RocketMQ Topic 中消费和写入数据的能力。Flink 的 Table API & SQL 程序可以连接到其他外部系统,用于读取和写入批处理和流式表。Source 提供对存储在外部系统(例如数据库、键值存储、消息队列或文件系统)中的数据的访问。Sink 将数据发送到外部存储系统。

该项目的 Github 仓库是: [​​github.com/apache/rock…​​]

RocketMQ Flink Catalog 设计与实践

三、RocketMQ Flink Catalog

3.1 设计与实现

3.1.1 RocketMQ Flink Catalog 的设计主要分为两步
  • 实现一个 RocketMqCatalogFactory 基于字符串属性创建已配置 Catalog 实例的工厂。将此实现类添加到​​META_INF/services/org.apache.flink.table.factories.Factory​​ 中。
  • 继承 AbstractCatalog 实现 RocketMqCatalog,通过实现 Catalog 接口中的方法,完成对数据库、表、分区等信息的查询操作。

类图如下:

RocketMQ Flink Catalog 设计与实践

3.1.2 RocketMQ Flink Catalog 的存储

RocketMQ Flink Catalog 的底层存储使用的是 RocketMQ Schema Registry。Flink 调用 Catalog 的时候,在 AbstractCatalog 的实现类中通过 RocketMQ Schema Registry 的客户端和 RocketMQ Schema Registry 服务端进行交互。

  • Database : 返回默认的 default 。
  • Table : 从 RocketMQ Schema Registry 获取对应的 Schema,然后解析 IDL 转换成 DataType。
  • Partition : 通过​​DefaultMQAdminExt​​ 从 RocketMQ 中获取到 Partition 相关信息。

RocketMQ Schema Registry是一个 Topic Schema 的管理中心。它为 Topic(RocketMQ Topic)的注册、删除、更新、获取和引用模式提供了一个 RESTful 接口。New RocketMQ 客户端通过将 Schema 与 Subject 关联起来,可以直接发送结构化数据。用户不再需要关心序列化和反序列化的细节。

RocketMQ Flink Catalog 设计与实践

3.1.3 RocketMQ Flink Catalog 支持的 API

目前 RocketMQ Flink Catalog 支持对 Database、Table、Partition 的查询和判断是否存在的操作,不支持创建、修改、删除。所以在使用之前需要通过 RocketMQ Schema Registry 来创建好对应的 Schema。

RocketMQ Flink Catalog 设计与实践

3.2 使用指南

表环境(TableEnvironment)是 Flink 中集成 Table API & SQL 的核心概念。它负责:

  • 在内部的 Catalog 中注册 Table。
  • 注册外部的 Catalog。
  • 加载可插拔模块。
  • 执行 SQL 查询。
  • 注册自定义函数 (scalar、table 或 aggregation)。
  • 将 DataStream 或 DataSet 转换成 Table。
  • 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。
3.2.1 创建并注册 Catalog

Table API

RocketMQCatalog rocketMqCatalog = new RocketMQCatalog("rocketmq_catalog", "default", "http://localhost:9876", "http://localhost:8080");
tableEnvironment.registerCatalog("rocketmq_catalog", rocketMqCatalog);

SQL

TableResult tableResult = tableEnvironment.executeSql(
"CREATE CATALOG rocketmq_catalog WITH (" +
"'type'='rocketmq_catalog'," +
"'nameserver.address'='http://localhost:9876'," +
"'schema.registry.base.url'='http://localhost:8088');");
3.2.2 修改当前的 Catalog

Table API

tableEnvironment.useCatalog("rocketmq_catalog");

SQL

tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");
3.2.3 列出可用的 Catalog

Table API

String[] catalogs = tableEnvironment.listCatalogs();

SQL

TableResult tableResult = tableEnvironment.executeSql("show catalogs");
3.2.4 列出可用的 Database

Table API

String[] databases = tableEnvironment.listDatabases();

SQL

TableResult tableResult = tableEnvironment.executeSql("show databases");
3.2.5 列出可用的 Table

Table API

String[] tables = tableEnvironment.listTables();

SQL

TableResult tableResult = tableEnvironment.executeSql("show tables");

3.3 Quick Start

需要提前准备可用的 RocketMQ 、RocketMQ Schema Registry:

3.3.1 创建 Topic

创建两个 Topic,rocketmq_source 和 rocketmq_sink。

RocketMQ Flink Catalog 设计与实践

3.3.2 注册 Source Schema
curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{"type":"record","name":"rocketmq_source_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_source/schema/rocketmq_source_schema
3.3.3 注册 Sink Schema
curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{"type":"record","name":"rocketmq_sink_schema","namespace":"namespace","fields":[{"name":"name","type":"string"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_sink/schema/rocketmq_sink_schema
3.3.4 添加依赖

创建一个任务项目 ,添加 rocketmq-flink 的依赖 :

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-flink</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>

目前 RocketMQ Schema Registry 还没有发布正式的版本,只有快照版,如果发现 jar 找不到,可以尝试以下方法:

<repositories>
<repository>
<id>snapshot-repos</id>
<name>Apache Snapshot Repository</name>
<url>https://repository.apache.org/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<layout>default</layout>
</repository>
</repositories>
3.3.5 创建任务
/**
* @author lixiaoshuang
*/
public class RocketMqCatalog {
public static void main(String[] args) {
// 初始化表环境参数
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
// 创建 table 环境
TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);

// 注册 rocketmq catalog
tableEnvironment.executeSql(
"CREATE CATALOG rocketmq_catalog WITH (" +
"'type'='rocketmq_catalog'," +
"'nameserver.address'='http://localhost:9876'," +
"'schema.registry.base.url'='http://localhost:8088');");
tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");

// 从 rocketmq_source 中获取数据写入到 rocketmq_sink 中
TableResult tableResult = tableEnvironment.executeSql("INSERT INTO rocketmq_sink /*+ OPTIONS" +
"('producerGroup'='topic_producer_group') */ select * from rocketmq_source /*+ OPTIONS" +
"('consumerGroup'='topic_consumer_group') */");
}
}

启动任务并运行以后,打开 RocketMQ 控制台,往 rocketmq_source 这个 Topic 发送一条消息。

RocketMQ Flink Catalog 设计与实践

然后再查看 rocketmq_sink 的状态,就会发现消息已经通过写入到 rocketmq_sink 中了。

RocketMQ Flink Catalog 设计与实践