Spring Integration 对MongoDB的支持

时间:2022-12-13 11:20:56

Spring Integration 对MongoDB的支持

2.1版本引入了对MongoDB的支持:“高性能,开源,面向文档的数据库”。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>6.0.0</version>
</dependency>

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档。

连接到 MongoDb

阻塞还是被动?

从版本 5.3 开始,Spring Integration 提供了对反应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。 要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到依赖项中:

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>

对于常规同步客户端,您需要将其各自的驱动程序添加到依赖项中:

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>

它们都在框架中,以获得更好的最终用户选择支持。​​optional​

要开始与MongoDB交互,您首先需要连接到它。 Spring 集成建立在另一个 Spring 项目 Spring Data MongoDB 提供的支持之上。 它提供了名为 and 的工厂类,简化了与 MongoDB 客户端 API 的集成。​​MongoDatabaseFactory​​​​ReactiveMongoDatabaseFactory​

Spring Data 默认提供阻塞 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择被动使用。

用​​MongoDatabaseFactory​

要连接到MongoDB,您可以使用接口的实现。​​MongoDatabaseFactory​

以下示例演示如何使用:​​SimpleMongoClientDatabaseFactory​

MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");

​SimpleMongoClientDatabaseFactory​​采用两个参数:一个实例和一个指定数据库名称的参数。 如果需要配置属性(如 、 等),可以使用基础类提供的构造函数之一传递这些属性。 有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考。​​MongoClient​​​​String​​​​host​​​​port​​​​MongoClients​

用​​ReactiveMongoDatabaseFactory​

要使用反应式驱动程序连接到MongoDB,您可以使用接口的实现。​​ReactiveMongoDatabaseFactory​

以下示例演示如何使用:​​SimpleReactiveMongoDatabaseFactory​

ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");

MongoDB消息存储

企业集成模式 (EIP) 一书中所述,邮件存储允许您保留消息。 如果可靠性是一个问题,则在处理能够缓冲消息(、、、和其他)的组件时,这样做会很有用。 在 Spring 集成中,该策略还为声明检查模式提供了基础,EIP 中也有描述。​​QueueChannel​​​​aggregator​​​​resequencer​​​​MessageStore​

Spring Integration的MongoDB模块提供了,它是策略(主要由声明检查模式使用)和策略(主要由聚合器和重新排序器模式使用)的实现。​​MongoDbMessageStore​​​​MessageStore​​​​MessageGroupStore​

以下示例将 配置为使用 a 和 :​​MongoDbMessageStore​​​​QueueChannel​​​​aggregator​

<bean  class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel >
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>

前面的示例是一个简单的 Bean 配置,它需要 a 作为构造函数参数。​​MongoDbFactory​

通过使用 Spring Data Mongo 映射机制将具有所有嵌套属性的 Mongo 文档展开。 当您需要访问 或 进行审核或分析(例如,针对存储的消息)时,它非常有用。​​MongoDbMessageStore​​​​Message​​​​payload​​​​headers​

使用自定义实现将实例存储为 MongoDB 文档,并且 的属性(和值)存在一些限制。​​MongoDbMessageStore​​​​MappingMongoConverter​​​​Message​​​​payload​​​​header​​​​Message​

从版本 5.1.6 开始,可以使用传播到内部实现中的自定义转换器进行配置。 有关更多信息,请参阅 JavaDocs。​​MongoDbMessageStore​​​​MappingMongoConverter​​​​MongoDbMessageStore.setCustomConverters(Object… customConverters)​

Spring Integration 3.0 引入了 . 它同时实现 和 接口。 此类可以作为构造函数参数接收 ,例如,您可以使用它配置自定义 . 另一个构造函数需要 a 和 a ,这允许您为实例及其属性提供一些自定义转换。 请注意,默认情况下,使用 标准 Java 序列化来向 MongoDB 写入和读取实例(请参阅),并且依赖于 中其他属性的默认值。 它从提供的和 . 由 存储的集合的默认名称是 。 我们建议使用此实现在消息包含复杂数据类型时创建可靠且灵活的解决方案。​​ConfigurableMongoDbMessageStore​​​​MessageStore​​​​MessageGroupStore​​​​MongoTemplate​​​​WriteConcern​​​​MappingMongoConverter​​​​MongoDbFactory​​​​Message​​​​ConfigurableMongoDbMessageStore​​​​Message​​​​MongoDbMessageBytesConverter​​​​MongoTemplate​​​​MongoTemplate​​​​MongoDbFactory​​​​MappingMongoConverter​​​​ConfigurableMongoDbMessageStore​​​​configurableStoreMessages​

MongoDB通道消息存储

4.0版引入了新的. 它针对在实例中使用进行了优化。 使用 ,您可以在实例中使用它来实现持久化消息的优先级顺序轮询。 优先级 MongoDB 文档字段从 () 消息标头填充。​​MongoDbChannelMessageStore​​​​MessageGroupStore​​​​QueueChannel​​​​priorityEnabled = true​​​​<int:priority-queue>​​​​IntegrationMessageHeaderAccessor.PRIORITY​​​​priority​

此外,所有MongoDB实例现在都有一个文档字段。 该值是对同一集合中的简单文档的操作的结果,该文档是按需创建的。 当消息存储在同一毫秒内时,该字段用于在操作中提供先进先出 (FIFO) 消息顺序(如果已配置,则在优先级范围内)。​​MessageStore​​​​sequence​​​​MessageGroup​​​​sequence​​​​$inc​​​​sequence​​​​sequence​​​​poll​

我们不建议对优先级和非优先级使用相同的 Bean,因为该选项适用于整个存储区。 但是,这两种类型都可以使用相同的方法,因为来自存储的消息轮询已排序并使用索引。 要配置该方案,可以从一个消息存储库 Bean 扩展另一个消息存储库 Bean,如以下示例所示:​​MongoDbChannelMessageStore​​​​priorityEnabled​​​​collection​​​​MongoDbChannelMessageStore​

<bean  class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel >
<int:queue message-store="store"/>
</int:channel>

<bean parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>

<int:channel >
<int:priority-queue message-store="priorityStore"/>
</int:channel>

MongoDB 元数据存储

Spring Integration 4.2引入了一个新的基于MongoDB的(参见元数据存储)实现。 可以使用 在应用程序重新启动期间维护元数据状态。 您可以将此新实现与适配器一起使用,例如:​​MetadataStore​​​​MongoDbMetadataStore​​​​MetadataStore​

要指示这些适配器使用 new ,请声明一个 Bean 名称为 的 Spring Bean。 源入站通道适配器自动拾取并使用声明的 . 下面的示例演示如何声明名称为 : 的 Bean:​​MongoDbMetadataStore​​metadataStore​​MongoDbMetadataStore​​metadataStore​

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

还实现了,让它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。 所有这些操作都是原子的,这要归功于MongoDB的保证。​​MongoDbMetadataStore​​​​ConcurrentMetadataStore​

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询使用者,它从 MongoDB 读取数据并将其作为有效负载发送。 以下示例演示如何配置 MongoDB 入站通道适配器:​​Message​

<int-mongodb:inbound-channel-adapter 
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前面的配置所示,您可以使用该元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:​​inbound-channel-adapter​

  • ​query​​:JSON 查询(请参阅 MongoDB 查询))
  • ​query-expression​​:计算结果为 JSON 查询字符串(如上面的属性)或 的实例的 SpEL 表达式。 与属性互斥。queryo.s.data.mongodb.core.query.Queryquery
  • ​entity-class​​:有效负载对象的类型。 如果未提供,则返回 a。com.mongodb.DBObject
  • ​collection-name​​或 :标识要使用的 MongoDB 集合的名称。collection-name-expression
  • ​mongodb-factory​​:对实例的引用o.s.data.mongodb.MongoDbFactory
  • ​mongo-template​​:对实例的引用o.s.data.mongodb.core.MongoTemplate
  • 所有其他入站适配器通用的其他属性(例如“通道”)。

You cannot set both and . ​​mongo-template​​​​mongodb-factory​

前面的示例相对简单且静态,因为它具有 的文本值,并使用 的默认名称。 有时,您可能需要根据某些条件在运行时更改这些值。 为此,请使用它们的等效项 ( 和 ),其中提供的表达式可以是任何有效的 SpEL 表达式。​​query​​​​collection​​​​-expression​​​​query-expression​​​​collection-name-expression​

此外,您可能希望对从MongoDB读取的成功处理数据进行一些后处理。 例如;您可能希望在处理文档后移动或删除文档。 您可以使用 Spring Integration 2.2 添加的事务同步功能来执行此操作,如以下示例所示:

<int-mongodb:inbound-channel-adapter 
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory >
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean class="thing1.thing2.DocumentCleaner"/>

<bean class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例显示了前面示例中引用的内容:​​DocumentCleaner​

public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?>){
List<?> documents = (List<?>) target;
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}

可以使用元素将轮询器声明为事务性轮询器。 此元素可以引用真正的事务管理器(例如,如果流的其他部分调用 JDBC)。 如果你没有“真实”事务,你可以使用 ,它是 Spring 的实现,可以在没有实际事务时使用 Mongo 适配器的事务同步功能。​​transactional​​​​o.s.i.transaction.PseudoTransactionManager​​​​PlatformTransactionManager​


这样做不会使MongoDB本身成为事务。 它允许在成功(提交)或失败(回滚)之前或之后执行操作的同步。

轮询器是事务性的后,您可以设置 on 元素的实例。 A 创建 的实例。 为方便起见,我们公开了一个基于 SpEL 的默认表达式,允许您配置 SpEL 表达式,其执行与事务协调(同步)。 支持提交前、提交后和回滚后的表达式,以及发送评估结果(如果有)的每个事件的通道。 对于每个子元素,可以指定 和 属性。 如果仅存在该属性,则接收到的消息将作为特定同步方案的一部分发送到那里。 如果仅存在该属性,并且表达式的结果为非 null 值,则会生成一条消息,并将结果作为有效负载发送到默认通道 (),并显示在日志中(在级别上)。 如果您希望评估结果转到特定渠道,请添加属性。 如果表达式的结果为 null 或 void,则不会生成任何消息。​​o.s.i.transaction.TransactionSynchronizationFactory​​​​transactional​​​​TransactionSynchronizationFactory​​​​TransactionSynchronization​​​​TransactionSynchronizationFactory​​​​expression​​​​channel​​​​channel​​​​expression​​​​NullChannel​​​​DEBUG​​​​channel​

有关事务同步的详细信息,请参阅事务同步。

从版本 5.5 开始,可以使用 配置 ,该 必须使用 MongoDb 语法计算为 a 或实例。 它可以用作上述后处理过程的替代方法,并修改从集合中提取的那些实体,因此在下一个轮询周期中不会再次从集合中提取它们(假设更新更改了查询中使用的某些值)。 当集群中使用同一集合的多个实例时,仍建议使用事务来实现执行隔离和数据一致性。​​MongoDbMessageSource​​​​updateExpression​​​​String​​​​update​​​​org.springframework.data.mongodb.core.query.Update​​​​MongoDbMessageSource​

MongoDB 更改流入站通道适配器

从版本 5.3 开始,该模块引入了 - Spring Data API 的响应式实现。 此组件生成 a 的消息,默认情况下以 of 作为有效负载,并生成一些与更改流相关的标头(请参阅)。 建议将其与 作为按需订阅和下游事件消费的 AS 结合使用。​​spring-integration-mongodb​​​​MongoDbChangeStreamMessageProducer​​​​MessageProducerSupport​​​​ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)​​​​Flux​​​​body​​​​ChangeStreamEvent​​​​MongoHeaders​​​​MongoDbChangeStreamMessageProducer​​​​FluxMessageChannel​​​​outputChannel​

此通道适配器的 Java DSL 配置可能如下所示:

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}

当 停止,或下游取消订阅,或 MongoDb 更改流生成 时,已完成。 通道适配器可以再次启动,并创建新的源数据,并在 中自动订阅。 如果需要使用来自其他地方的更改流事件,则可以重新配置此通道适配器以在启动之间使用新选项。​​MongoDbChangeStreamMessageProducer​​​​OperationType.INVALIDATE​​​​Publisher​​​​Publisher​​​​MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)​

有关更改流支持的更多信息,请参阅Spring Data MongoDb文档。

MongoDB 出站通道适配器

MongoDB 出站通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如以下示例所示:

<int-mongodb:outbound-channel-adapter 
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />

如前面的配置所示,您可以使用该元素配置 MongoDB 出站通道适配器,为各种属性提供值,例如:​​outbound-channel-adapter​

  • ​collection-name​​或 :标识要使用的 MongoDb 集合的名称。collection-name-expression
  • ​mongo-converter​​:对实例的引用有助于将原始 Java 对象转换为 JSON 文档表示形式。o.s.data.mongodb.core.convert.MongoConverter
  • ​mongodb-factory​​:对 实例的引用。o.s.data.mongodb.MongoDbFactory
  • ​mongo-template​​:对 实例的引用。 注意:您不能同时设置mongo模板和mongodb出厂设置。o.s.data.mongodb.core.MongoTemplate
  • 所有入站适配器中通用的其他属性(例如“通道”)。

前面的示例相对简单和静态,因为它具有 的文字值。 有时,您可能需要根据某些条件在运行时更改此值。 为此,请使用 ,其中提供的表达式是任何有效的 SpEL 表达式。​​collection-name​​​​collection-name-expression​

MongoDB Outbound Gateway

版本 5.0 引入了 MongoDB 出站网关。 它允许您通过向其请求通道发送消息来查询数据库。 然后,网关将响应发送到回复通道。 可以使用消息负载和标头来指定查询和集合名称,如以下示例所示:

@SpringBootApplication
public class MongoDbJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}

@Autowired
private MongoDbFactory;

@Autowired
private MongoConverter;


@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}

private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}

}

您可以将以下属性与 MongoDB 出站网关一起使用:

  • ​collection-name​​或 :标识要使用的 MongoDB 集合的名称。collection-name-expression
  • ​mongo-converter​​:对实例的引用有助于将原始 Java 对象转换为 JSON 文档表示形式。o.s.data.mongodb.core.convert.MongoConverter
  • ​mongodb-factory​​:对 实例的引用。o.s.data.mongodb.MongoDbFactory
  • ​mongo-template​​:对 实例的引用。 注意:不能同时设置 和 。o.s.data.mongodb.core.MongoTemplatemongo-templatemongodb-factory
  • ​entity-class​​:要传递给 MongoTemplate 中的 and 方法的实体类的完全限定名。 如果未提供此属性,则默认值为 。find(..)findOne(..)org.bson.Document
  • ​query​​或 :指定 MongoDB 查询。 有关更多查询示例,请参阅 MongoDB 文档。query-expression
  • ​collection-callback​​:对 实例的引用。 最好是自 5.0.11 起具有请求消息上下文的实例。 有关更多信息,请参阅其 Javadocs。 注意:您不能同时拥有这两个属性和任何查询属性。org.springframework.data.mongodb.core.CollectionCallbacko.s.i.mongodb.outbound.MessageCollectionCallbackcollection-callback

作为 and 属性的替代方法,可以通过将该属性用作对功能接口实现的引用来指定其他数据库操作。 以下示例指定计数操作:​​query​​​​query-expression​​​​collectionCallback​​​​MessageCollectionCallback​

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}

MongoDB 反应式通道适配器

从版本 5.3 开始,提供了 和 实现。 它们基于来自 Spring 数据,需要依赖关系。​​ReactiveMongoDbStoringMessageHandler​​​​ReactiveMongoDbMessageSource​​​​ReactiveMongoOperations​​​​org.mongodb:mongodb-driver-reactivestreams​

这是当集成流定义中涉及反应式流组合时,框架中本机支持的实现。 有关详细信息,请参阅 ReactiveMessageHandler。​​ReactiveMongoDbStoringMessageHandler​​​​ReactiveMessageHandler​

从配置的角度来看,与许多其他标准通道适配器没有区别。 例如,对于Java DSL,可以使用这样的通道适配器:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的连接到 MongoDb,并将请求消息中的数据存储到具有名称的默认集合中。 真正的操作将从内部创建的反应性流组合中按需执行。​​ReactiveMongoDatabaseFactory​​​​data​​​​ReactiveStreamsConsumer​

这是基于提供的 or 和 MongoDb 查询(或表达式)、调用或操作的实现,根据具有预期类型的选项来转换查询结果。 当(或根据选项)订阅所生成消息的有效负载时,按需执行查询执行和结果评估。 框架可以在拆分器时自动(本质上)订阅这样的有效负载,并在下游使用。 否则,目标应用程序有责任订阅下游终结点中的轮询发布者。​​ReactiveMongoDbMessageSource​​​​AbstractMessageSource​​​​ReactiveMongoDatabaseFactory​​​​ReactiveMongoOperations​​​​find()​​​​findOne()​​​​expectSingleResult​​​​entityClass​​​​Publisher​​​​Flux​​​​Mono​​​​expectSingleResult​​​​flatMap​​​​FluxMessageChannel​

使用Java DSL,这样的通道适配器可以配置为:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}

从版本 5.5 开始,可以使用 . 它具有与 阻塞 . 有关更多信息,请参阅 MongoDB 入站通道适配器和 JavaDocs。​​ReactiveMongoDbMessageSource​​​​updateExpression​​​​MongoDbMessageSource​​​​AbstractMongoDbMessageSourceSpec​