Spring Cloud Schema Registry

时间:2022-12-02 17:02:30

Spring Cloud Schema Registry

介绍

当组织具有基于消息传递的发布/订阅体系结构并且多个创建者和使用者微服务相互通信时,所有这些微服务通常需要就基于架构的协定达成一致。 当这样的架构需要发展以适应新的业务需求时,现有组件仍然需要继续工作。 该项目提供对独立架构注册表服务器的支持,应用程序可以使用该服务器注册和使用上述架构。 它还包含对基于 avro 的架构注册表客户端的支持,这些客户端实质上提供与架构注册表通信的消息转换器,以便在消息转换期间协调架构。 该项目提供的模式演化支持既适用于上述独立模式注册表,也适用于 Confluent 提供的专门与 Apache Kafka 配合使用的模式注册表。

Spring 云模式注册表概述

Spring Cloud Schema Registry 提供对模式演变的支持,以便数据可以随着时间的推移而发展,并且仍然可以与较旧或较新的生产者和消费者一起使用,反之亦然。大多数序列化模型(尤其是旨在跨不同平台和语言实现可移植性的模型)都依赖于描述如何在二进制有效负载中序列化数据的架构。为了序列化数据,然后对其进行解释,发送方和接收方都必须有权访问描述二进制格式的架构。在某些情况下,可以在序列化时从有效负载类型推断架构,也可以在反序列化时从目标类型推断架构。 但是,许多应用程序都可以访问描述二进制数据格式的显式架构。 架构注册表允许您以文本格式(通常为 JSON)存储架构信息,并使需要它以二进制格式接收和发送数据的各种应用程序可以访问该信息。 架构可作为元组引用,其中包含:

  • 作为架构的逻辑名称的主题
  • 架构版本
  • 架构格式,描述数据的二进制格式

Spring Cloud Schema Registry 提供以下组件

  • 独立架构注册表服务器
By default, it is using an H2 database, but server can be used with other databases by providing appropriate datasource configuration.
  • 能够通过与架构注册表通信来进行消息封送的架构注册表客户端。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

架构注册表客户端

与架构注册表服务器交互的客户端抽象是接口,它具有以下结构:​​SchemaRegistryClient​

public interface SchemaRegistryClient {

SchemaRegistrationResponse register(String subject, String format, String schema);

String fetch(SchemaReference schemaReference);

String fetch(Integer id);

}

Spring Cloud Stream 提供了开箱即用的实现,用于与自己的模式服务器交互以及与 Confluent 模式注册表进行交互。

可以使用以下命令配置Spring Cloud Stream模式注册表的客户端,如下所示:​​@EnableSchemaRegistryClient​

@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public static class AvroSinkApplication {
//...
}

默认转换器经过优化,不仅可以缓存来自远程服务器的模式,还可以缓存非常昂贵的方法。 因此,它使用 athat 不缓存响应。 如果要更改默认行为,可以直接在代码上使用客户端,并将其重写为所需的结果。 为此,必须将该属性添加到应用程序属性中。​​parse()​​​​toString()​​​​DefaultSchemaRegistryClient​​​​spring.cloud.schemaRegistryClient.cached=true​

架构注册表客户端属性

架构注册表客户端支持以下属性:

​spring.cloud.schemaRegistryClient.endpoint​

架构服务器的位置。 设置此设置时,请使用完整的 URL,包括协议 (或)、端口和上下文路径。​​http​​​​https​

违约

​http://localhost:8990/​

​spring.cloud.schemaRegistryClient.cached​

客户端是否应缓存架构服务器响应。 通常设置为 ,因为缓存发生在消息转换器中。 使用架构注册表客户端的客户端应将其设置为。​​false​​​​true​

违约

​false​

Avro 架构注册表客户端消息转换器

对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置一个 Apache Avro 消息转换器进行模式管理。 这简化了架构演变,因为接收消息的应用程序可以轻松访问可与其自己的读取器架构协调的编写器架构。

对于出站邮件,如果通道的内容类型设置为“将激活”,如以下示例所示:​​application/*+avro​​​​MessageConverter​

spring.cloud.stream.bindings.output.contentType=application/*+avro

在出站转换期间,消息转换器尝试推断每个出站消息的架构(基于其类型),并使用 将其注册到主题(基于有效负载类型)。 如果已找到相同的架构,则会检索对该架构的引用。 否则,将注册架构,并提供新版本号。 使用以下方案以标头发送消息:,其中可配置并从有效负载类型推导出来。​​SchemaRegistryClient​​​​contentType​​​​application/[prefix].[subject].v[version]+avro​​​​prefix​​​​subject​

例如,该类型的消息可能作为二进制有效负载发送,内容类型为 主题,是版本号。​​User​​​​application/vnd.user.v2+avro​​​​user​​​​2​

接收消息时,转换器从传入消息的标头推断架构引用并尝试检索它。该架构在反序列化过程中用作编写器架构。

Avro 架构注册表消息转换器属性

如果已通过设置启用了基于 Avro 的架构注册表客户端,则可以通过设置以下属性来自定义注册行为。​​spring.cloud.stream.bindings.output.contentType=application/*+avro​

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled

如果您希望转换器使用反射从 POJO 推断架构,请启用。

违约:​​false​

spring.cloud.schema.avro.readerSchema

Avro 通过查看编写器架构(源有效负载)和读取器架构(应用程序有效负载)来比较架构版本。有关详细信息,请参阅Avro 文档。如果设置,这将覆盖架构服务器上的任何查找,并使用本地架构作为读取器架构。 违约:​​null​

spring.cloud.schema.avro.schemaLocations

将此属性中列出的任何文件注册到架构服务器。​​.avsc​

违约:​​empty​

spring.cloud.schema.avro.prefix

要在内容类型标头上使用的前缀。

违约:​​vnd​

spring.cloud.schema.avro.subject命名策略

确定用于在架构注册表中注册 Avro 架构的使用者名称。有两种实现可用,, 其中,主题是架构名称,并且返回使用 Avro 架构命名空间和名称的完全限定的主题。可以通过实施创建自定义策略。​​org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy​​​​org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy​​​​org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy​

违约:​​org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy​

Apache Avro Message Converters

Spring Cloud Stream 通过其模块为基于模式的消息转换器提供支持。 目前,基于架构的消息转换器唯一支持的现成序列化格式是 Apache Avro,未来版本中将添加更多格式。​​spring-cloud-schema-registry-client​

该模块包含两种类型的消息转换器,可用于Apache Avro序列化:​​spring-cloud-schema-registry-client​

  • 使用序列化或反序列化对象的类信息的转换器,或者使用在启动时具有已知位置的架构。
  • 使用架构注册表的转换器。它们在运行时查找架构,并在域对象演变时动态注册新架构。

具有架构支持的转换器

支持序列化和反序列化消息,方法是使用预定义的架构或使用类中可用的架构信息(反射式或包含在 中)。 如果提供定制转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下示例显示了一个自定义转换器:​​AvroSchemaMessageConverter​​​​SpecificRecord​

若要使用自定义转换器,只需将其添加到应用程序上下文中,可以选择指定一个或多个与之关联的转换器。 默认值是。​​MimeTypes​​​​MimeType​​​​application/avro​

如果转换的目标类型为 a,则必须设置架构。​​GenericRecord​

以下示例演示如何通过注册 Apache Avro(没有预定义架构)在接收器应用程序中配置转换器。 在此示例中,请注意 mime 类型值是,而不是默认值。​​MessageConverter​​​​avro/bytes​​​​application/avro​

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

//...

@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}

相反,以下应用程序使用预定义的架构(在类路径中找到)注册转换器:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

//...

@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}

架构注册表服务器

Spring Cloud Stream 提供了一个模式注册表服务器实现。 要使用它,您可以下载最新版本并将其作为独立应用程序运行:​​spring-cloud-schema-registry-server​

wget https://repo.spring.io/libs-milestone/org/springframework/cloud/spring-cloud-schema-registry-server/1.0.0.RC2/spring-cloud-schema-registry-server-1.0.0.RC2.jar
java -jar ./spring-cloud-schema-registry-server-1.0.0.RC2.jar


您可以将模式注册表嵌入到现有的 Spring 引导 Web 应用程序中。 为此,请将 theartifact 添加到您的项目中并使用注释,这会将架构注册表服务器 REST 控制器添加到您的应用程序中。 以下示例显示了一个启用模式注册表的 Spring 引导应用程序:​​spring-cloud-schema-server​​​​@EnableSchemaRegistryServer​




@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}



该属性可用于控制架构服务器的根路径(尤其是当它嵌入到其他应用程序中时)。 属性允许删除架构。默认情况下,此功能处于禁用状态。​​spring.cloud.schema.server.path​​​​spring.cloud.schema.server.allowSchemaDeletion​

架构注册表服务器使用关系数据库来存储架构。 默认情况下,它使用嵌入式数据库。 您可以使用Spring 引导 SQL 数据库和 JDBC 配置选项来定制模式存储。

架构注册表服务器 API

架构注册表服务器 API 由以下操作组成:

  • ​POST /​​— 见Registering a New Schema
  • 'GET /{subject}/{format}/{version}' — 参见Retrieving an Existing Schema by Subject, Format, and Version
  • ​GET /{subject}/{format}​​— 见Retrieving an Existing Schema by Subject and Format
  • ​GET /schemas/{id}​​— 见Retrieving an Existing Schema by ID
  • ​DELETE /{subject}/{format}/{version}​​— 见Deleting a Schema by Subject, Format, and Version
  • ​DELETE /schemas/{id}​​— 见Deleting a Schema by ID
  • ​DELETE /{subject}​​— 见Deleting a Schema by Subject
注册新架构

若要注册新架构,请向终结点发送请求。​​POST​​​​/​

接受具有以下字段的 JSON 有效负载:​​/​

  • ​subject​​:架构主题
  • ​format​​:架构格式
  • ​definition​​:架构定义

它的响应是 JSON 格式的架构对象,具有以下字段:

  • ​id​​:架构 ID
  • ​subject​​:架构主题
  • ​format​​:架构格式
  • ​version​​:架构版本
  • ​definition​​:架构定义
按主题、格式和版本检索现有架构

若要按主题、格式和版本检索现有架构,请将请求发送到终结点。​​GET​​​​/{subject}/{format}/{version}​

它的响应是 JSON 格式的架构对象,具有以下字段:

  • ​id​​:架构 ID
  • ​subject​​:架构主题
  • ​format​​:架构格式
  • ​version​​:架构版本
  • ​definition​​:架构定义
按主题和格式检索现有架构

若要按主题和格式检索现有架构,请向终结点发送请求。​​GET​​​​/subject/format​

它的响应是包含 JSON 格式的每个架构对象的架构列表,其中包含以下字段:

  • ​id​​:架构 ID
  • ​subject​​:架构主题
  • ​format​​:架构格式
  • ​version​​:架构版本
  • ​definition​​:架构定义
按 ID 检索现有架构

要按架构 ID 检索架构,请向终结点发送请求。​​GET​​​​/schemas/{id}​

它的响应是 JSON 格式的架构对象,具有以下字段:

  • ​id​​:架构 ID
  • ​subject​​:架构主题
  • ​format​​:架构格式
  • ​version​​:架构版本
  • ​definition​​:架构定义
按主题、格式和版本删除架构

要删除由其主题、格式和版本标识的架构,请向端点发送请求。​​DELETE​​​​/{subject}/{format}/{version}​

按 ID 删除架构

要按架构 ID 删除架构,请向终结点发送请求。​​DELETE​​​​/schemas/{id}​

按主题删除架构

​DELETE /{subject}​

按主题删除现有架构。

本说明仅适用于春云流1.1.0.发布版的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用表名,在许多数据库实现中 storingobjects.is 关键字。 为了避免将来发生任何冲突,从 1.1.1.RELEASE 开始,我们选择了存储表的名称。 任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应在升级之前将其现有架构迁移到新表。​​schema​​​​Schema​​​​Schema​​​​SCHEMA_REPOSITORY​

使用 Confluent 的架构注册表

默认配置创建 abean。 如果要使用 Confluent 模式注册表,则需要创建一个类型的 Bean,该 Bean 将取代框架默认配置的 Bean。下面的示例演示如何创建这样的 Bean:​​DefaultSchemaRegistryClient​​​​ConfluentSchemaRegistryClient​

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}

ConfluentSchemaRegistryClient 针对 Confluent 平台版本 4.0.0 进行了测试。

架构注册和解析

为了更好地了解Spring Cloud Stream如何注册和解析新模式及其对Avro模式比较功能的使用,我们提供了两个单独的小节:

  • ​Schema Registration Process (Serialization)​
  • ​Schema Resolution Process (Deserialization)​

架构注册过程(序列化)

注册过程的第一部分是从通过通道发送的有效负载中提取架构。 Avro 类型(如 asoralready)包含一个架构,可以立即从实例中检索。 在 POJO 的情况下,如果属性设置为(默认值),则会推断出架构。​​SpecificRecord​​​​GenericRecord​​​​spring.cloud.schema.avro.dynamicSchemaGenerationEnabled​​​​true​

Spring Cloud Schema Registry

图1.架构编写器解析过程

一旦获得模式,转换器就会从远程服务器加载其元数据(版本)。 首先,它查询本地缓存。如果未找到结果,它将数据提交到服务器,服务器会回复版本控制信息。 转换器始终缓存结果,以避免在架构服务器中查询需要序列化的每条新消息的开销。

Spring Cloud Schema Registry

图2.架构注册过程

使用架构版本信息,转换器将消息的标头设置为携带版本信息,例如:。​​contentType​​​​application/vnd.user.v1+avro​

架构解析过程(反序列化)

读取包含版本信息(即,具有如下所述方案的标头)的消息时,转换器将查询架构服务器以获取消息的编写器架构。 找到传入消息的正确架构后,它将检索读取器架构,并使用 Avro 的架构解析支持将其读入读取器定义(设置默认值和任何缺少的属性)。​​contentType​​​​Schema Registration Process (Serialization)​

Spring Cloud Schema Registry

图3.架构读取解析过程

您应该了解编写器架构(编写消息的应用程序)和读取器架构(接收应用程序)之间的区别。 我们建议花点时间阅读Avro 术语​并了解该过程。 Spring Cloud Stream 始终获取编写器架构以确定如何读取消息。 如果你想让 Avro 的模式演进支持工作,你需要确保为你的应用程序正确设置了 awas。​​readerSchema​