Spring AMQP项目

时间:2022-12-19 11:28:27

Spring AMQP项目

Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。 我们提供了一个“模板”作为发送和接收消息的高级抽象。 我们还为消息驱动的 POJO 提供支持。 这些库有助于管理 AMQP 资源,同时促进依赖关系注入和声明性配置的使用。 在所有这些情况下,你可以看到与 Spring 框架中的 JMS 支持的相似之处。 有关其他与项目相关的信息,请访问Spring AMQP项目主页。

2. 最新消息

2.1. 自 2.4 以来 3.0 的变化

2.1.1. Java 17, Spring Framework 6.0

此版本需要 Spring Framework 6.0 和 Java 17

2.1.2. 远程处理

不再支持远程处理功能(使用 RMI)。

2.1.3. 观察

现在支持使用千分尺启用计时器观察和跟踪。 有关更多信息,请参阅千分尺观察。

2.1.4. 原生镜像

提供了对创建本机映像的支持。 有关详细信息,请参阅本机映像。

2.1.5. 异步兔子模板

现在返回 s 而不是 s。 有关详细信息,请参阅异步兔子模板​。​​AsyncRabbitTemplate​​​​CompletableFuture​​​​ListenableFuture​

2.1.6. 流支持更改

​RabbitStreamOperations​​​并且方法现在返回而不是 .​​RabbitStreamTemplate​​​​CompletableFuture​​​​ListenableFuture​

现在支持超级流和单个活跃使用者。

有关更多信息,请参阅 使用 RabbitMQ 流插件。

2.1.7. 变更​​@RabbitListener​

批处理侦听器现在可以同时使用 . 批处理消息传递适配器现在可确保该方法适用于使用批处理。 将容器工厂设置为 时,该属性也设置为 。 有关详细信息,请参阅批处理@RabbitListener。​​Collection<?>​​​​List<?>​​​​consumerBatchEnabled​​​​true​​​​batchListener​​​​true​

​MessageConverter​​s 现在可以返回空值;这目前由 . 有关详细信息,请参阅从消息转换​​Optional.empty()​​​​Jackson2JsonMessageConverter​

现在,您可以通过容器工厂而不是通过 上的属性配置 。 有关详细信息,请参阅回复管理。​​ReplyPostProcessor​​​​@RabbitListener​

2.1.8. 连接工厂变更

中的默认值现在为 。这会导致在提供多个地址时连接到随机主机。 有关详细信息,请参阅连接到集群。​​addressShuffleMode​​​​AbstractConnectionFactory​​​​RANDOM​

不再使用 RabbitMQ 库来确定哪个节点是队列的领导者。 有关详细信息,请参阅队列相关性和 LocalizedQueueConnectionFactory。​​LocalizedQueueConnectionFactory​​​​http-client​

3. 简介

参考文档的第一部分是对Spring AMQP和基本概念的高级概述。 它包括一些代码片段,可帮助您尽快启动并运行。

3.1. 不耐烦的人快速浏览

3.1.1. 简介

这是开始春季AMQP的五分钟导览。

先决条件:安装并运行 RabbitMQ 代理 (https://www.rabbitmq.com/download.html)。 然后获取 spring-rabbit JAR 及其所有依赖项 - 最简单的方法是在构建工具中声明依赖项。 例如,对于 Maven,您可以执行类似于以下内容的操作:

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>3.0.0</version>
</dependency>

对于 Gradle,您可以执行类似以下步骤的操作:

compile 'org.springframework.amqp:spring-rabbit:3.0.0'
兼容性

Spring 框架版本的最低依赖关系是 5.2.0。

Java 客户机库的最低版本为 5.7.0。​​amqp-client​

流队列的最小 Java 客户机库为 0.7.0。​​stream-client​

非常非常快

本节提供最快的介绍。

首先,添加以下语句以使本节后面的示例正常工作:​​import​

以下示例使用普通的命令式 Java 发送和接收消息:

ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

请注意,原生 Java Rabbit 客户端中还有一个。 我们在前面的代码中使用了 Spring 抽象。 它缓存通道(以及可选的连接)以供重用。 我们依赖于代理中的默认交换(因为在发送中没有指定任何交换),以及所有队列通过其名称默认绑定到默认交换(因此,我们可以在发送中使用队列名称作为路由密钥)。 这些行为在 AMQP 规范中定义。​​ConnectionFactory​

使用 XML 配置

以下示例与前面的示例相同,但将资源配置外部化为 XML:

ApplicationContext context =
new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">

<rabbit:connection-factory />

<rabbit:template connection-factory="connectionFactory"/>

<rabbit:admin connection-factory="connectionFactory"/>

<rabbit:queue name="myqueue"/>

</beans>

缺省情况下,声明会自动查找 、 的 bean,并代表用户将它们声明给代理。 因此,您不需要在简单的 Java 驱动程序中显式使用该 Bean。 有很多选项可用于配置 XML 架构中组件的属性。 您可以使用 XML 编辑器的自动完成功能来浏览它们并查看其文档。​​<rabbit:admin/>​​​​Queue​​​​Exchange​​​​Binding​

使用爪哒配置

以下示例重复与前面的示例相同的示例,但使用了 Java 中定义的外部配置:

ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

........

@Configuration
public class RabbitConfiguration {

@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}

@Bean
public RabbitAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
具有 Spring 引导自动配置和异步 POJO 侦听器

Spring 引导会自动配置基础结构 bean,如以下示例所示:

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public ApplicationRunner runner(AmqpTemplate template) {
return args -> template.convertAndSend("myqueue", "foo");
}

@Bean
public Queue myQueue() {
return new Queue("myqueue");
}

@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
}

}

4. 参考资料

参考文档的这一部分详细介绍了构成Spring AMQP的各种组件。 主要章节介绍了开发 AMQP 应用程序的核心类。 本部分还包括有关示例应用程序的章节。

4.1. 使用弹簧 AMQP

本章探讨了使用 Spring AMQP 开发应用程序的基本组件接口和类。

4.1.1. AMQP 抽象

Spring AMQP 由两个模块组成(每个模块在发行版中由一个 JAR 表示):和 。 “spring-amqp”模块包含该软件包。 在该包中,您可以找到表示核心 AMQP“模型”的类。 我们的目的是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。 最终用户代码可以跨供应商实现更具可移植性,因为它可以仅针对抽象层进行开发。 然后,这些抽象由特定于代理的模块实现,例如“spring-rabbit”。 目前只有一个 RabbitMQ 实现。 但是,除了RabbitMQ之外,这些抽象已经在.NET中使用Apache Qpid进行了验证。 由于 AMQP 在协议级别运行,原则上可以将 RabbitMQ 客户端与任何支持相同协议版本的代理一起使用,但我们目前不测试任何其他代理。​​spring-amqp​​​​spring-rabbit​​​​org.springframework.amqp.core​

本概述假定您已经熟悉 AMQP 规范的基础知识。 如果没有,请查看其他资源中列出的资源

​Message​

0-9-1 AMQP 规范未定义类或接口。 相反,在执行诸如 之类的操作时,内容将作为字节数组参数传递,其他属性将作为单独的参数传入。 Spring AMQP将类定义为更通用的AMQP域模型表示的一部分。 该类的目的是将主体和属性封装在单个实例中,以便 API 可以反过来更简单。 下面的示例演示类定义:​​Message​​​​basicPublish()​​​​Message​​​​Message​​​​Message​

public class Message {

private final MessageProperties messageProperties;

private final byte[] body;

public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}

public byte[] getBody() {
return this.body;
}

public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}

该接口定义了几个常见属性,例如“messageId”、“timestamp”、“内容类型”等。 您还可以通过调用该方法,使用用户定义的“标头”扩展这些属性。​​MessageProperties​​​​setHeader(String key, Object value)​

从版本 、、 和 开始,如果消息正文是序列化的 Java 对象,则在执行操作(例如在日志消息中)时不再反序列化(缺省情况下)。 这是为了防止不安全的反序列化。 默认情况下,只有 和 类被反序列化。 要恢复到以前的行为,可以通过调用 来添加允许的类/包模式。 支持简单的通配符,例如​​com.something。​ 无法反序列化的主体在日志消息中表示。​​1.5.7​​​​1.6.11​​​​1.7.4​​​​2.0.0​​​​Serializable​​​​toString()​​​​java.util​​​​java.lang​​​​Message.addAllowedListPatterns(…)​​​​, *.MyClass​​​​byte[<size>]​

交换

该接口表示 AMQP 交换,即消息生产者发送到的内容。 经纪商虚拟主机中的每个交易所都有一个唯一的名称以及一些其他属性。 以下示例显示了该接口:​​Exchange​​​​Exchange​

public interface Exchange {

String getName();

String getExchangeType();

boolean isDurable();

boolean isAutoDelete();

Map<String, Object> getArguments();

}

如您所见,an 也有一个由 中定义的常量表示的“类型”。 基本类型包括:、、 和 。 在核心包中,您可以找到每种类型的接口实现。 这些类型的行为在如何处理队列绑定方面有所不同。 例如,交换允许队列由固定路由密钥(通常是队列的名称)绑定。 交换支持具有路由模式的绑定,这些模式可能分别包含“*”和“#”通配符,分别表示“恰好一个”和“零个或多个”。 交易所发布到绑定到它的所有队列,而不考虑任何路由密钥。 有关这些和其他 Exchange 类型的详细信息,请参阅其他资源。​​Exchange​​​​ExchangeTypes​​​​direct​​​​topic​​​​fanout​​​​headers​​​​Exchange​​​​Exchange​​​​Direct​​​​Topic​​​​Fanout​

AMQP规范还要求任何经纪商提供没有名称的“默认”直接交易所。 声明的所有队列都绑定到该默认值,其名称作为路由密钥。 您可以在 AmqpTemplate​ 中了解有关默认 Exchange 在 Spring AMQP 中的用法的更多信息。​​Exchange​

队列

该类表示消息使用者从中接收消息的组件。 与各种类一样,我们的实现旨在成为此核心 AMQP 类型的抽象表示。 以下清单显示了该类:​​Queue​​​​Exchange​​​​Queue​

public class Queue  {

private final String name;

private volatile boolean durable;

private volatile boolean exclusive;

private volatile boolean autoDelete;

private volatile Map<String, Object> arguments;

/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}

// Getters and Setters omitted for brevity

}

请注意,构造函数采用队列名称。 根据实现,管理模板可能会提供用于生成唯一命名队列的方法。 此类队列可用作“回复”地址或其他临时情况。 因此,自动生成队列的“独占”和“自动删除”属性都将设置为“true”。

有关使用命名空间支持(包括队列参数)声明队列的信息,请参阅配置代理中有关队列的部分。

捆绑

假设生产者发送到交易所,消费者从队列接收,将队列连接到交易所的绑定对于通过消息传递连接这些生产者和消费者至关重要。 在Spring AMQP中,我们定义了一个类来表示这些连接。 本节回顾将队列绑定到交易所的基本选项。​​Binding​

您可以使用固定路由密钥将队列绑定到 ,如以下示例所示:​​DirectExchange​

new Binding(someQueue, someDirectExchange, "foo.bar");

您可以使用路由模式将队列绑定到 ,如以下示例所示:​​TopicExchange​

new Binding(someQueue, someTopicExchange, "foo.*");

您可以将队列绑定到没有路由密钥的 ,如以下示例所示:​​FanoutExchange​

new Binding(someQueue, someFanoutExchange);

我们还提供了一个促进“流畅 API”样式,如以下示例所示:​​BindingBuilder​

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

为清楚起见,前面的示例显示了该类,但是当对“bind()”方法使用静态导入时,此样式效果很好。​​BindingBuilder​

就其本身而言,类的实例仅保存有关连接的数据。 换句话说,它不是一个“活动”组件。 但是,正如稍后将在配置代理中看到的那样,该类可以使用实例来实际触发代理上的绑定操作。 此外,正如您在同一部分中所看到的,您可以通过在类中使用 Spring 的注释来定义实例。 还有一个方便的基类,它进一步简化了生成与 AMQP 相关的 Bean 定义的方法,并识别队列、交换和绑定,以便在应用程序启动时在 AMQP 代理上声明它们。​​Binding​​​​AmqpAdmin​​​​Binding​​​​Binding​​​​@Bean​​​​@Configuration​

也在核心包中定义。 作为实际 AMQP 消息传递中涉及的主要组件之一,它在其自己的部分中进行了详细讨论(请参阅 AmqpTemplate)。​​AmqpTemplate​

4.1.2. 连接和资源管理

虽然我们在上一节中描述的 AMQP 模型是通用的并且适用于所有实现,但当我们进入资源管理时,细节特定于代理实现。 因此,在本节中,我们将重点介绍仅存在于“spring-rabbit”模块中的代码,因为此时,RabbitMQ 是唯一受支持的实现。

管理与 RabbitMQ 代理的连接的核心组件是接口。 实现的职责是提供 的实例,该实例是 的包装器。​​ConnectionFactory​​​​ConnectionFactory​​​​org.springframework.amqp.rabbit.connection.Connection​​​​com.rabbitmq.client.Connection​

选择连接工厂

有三家连接工厂可供选择

  • ​PooledChannelConnectionFactory​
  • ​ThreadChannelConnectionFactory​
  • ​CachingConnectionFactory​

前两个是在 2.3 版中添加的。

对于大多数用例,应使用 。 如果要确保严格的消息排序而不需要使用作用域内操作,则可以使用 。 如果要使用相关的发布者确认,或者希望通过其打开多个连接,则应使用 。​​PooledChannelConnectionFactory​​​​ThreadChannelConnectionFactory​​​​CachingConnectionFactory​​​​CacheMode​

所有三个工厂都支持简单的发布者确认。

将 配置为使用单独的连接时,现在可以从版本 2.3.2 开始,将发布连接工厂配置为其他类型。 默认情况下,发布工厂的类型相同,在主工厂上设置的任何属性也会传播到发布工厂。​​RabbitTemplate​

​PooledChannelConnectionFactory​

此工厂基于 Apache Pool2 管理单个连接和两个通道池。 一个池用于事务通道,另一个池用于非事务通道。 池是默认配置的;提供回调以配置池;有关更多信息,请参阅 Apache 文档。​​GenericObjectPool​

Apache jar 必须位于类路径上才能使用此工厂。​​commons-pool2​

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
​ThreadChannelConnectionFactory​

此工厂管理单个连接和两个连接,一个用于事务通道,另一个用于非事务通道。 此工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。 这有助于严格的消息排序,而无需作用域内操作。 为避免内存泄漏,如果应用程序使用许多生存期较短的线程,则必须调用工厂的线程来释放通道资源。 从版本 2.3.7 开始,线程可以将其通道传输到另一个线程。 有关详细信息,请参阅多线程环境中的严格消息排序。​​ThreadLocal​​​​closeThreadChannel()​

​CachingConnectionFactory​

提供的第三个实现是 ,默认情况下,它建立可由应用程序共享的单个连接代理。 共享连接是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中的连接和会话之间的关系)。 连接实例提供了一个方法。 该实现支持缓存这些通道,并根据通道是否为事务性为通道维护单独的缓存。 创建 的实例时,可以通过构造函数提供“主机名”。 您还应该提供“用户名”和“密码”属性。 若要配置通道缓存的大小(默认值为 25),可以调用该方法。​​CachingConnectionFactory​​​​createChannel​​​​CachingConnectionFactory​​​​CachingConnectionFactory​​​​setChannelCacheSize()​

从版本 1.3 开始,您可以配置缓存连接以及仅通道。 在这种情况下,每次调用 都会创建一个新连接(或从缓存中检索空闲连接)。 关闭连接会将其返回到缓存(如果尚未达到缓存大小)。 在此类连接上创建的通道也会被缓存。 在某些环境中使用单独的连接可能很有用,例如从 HA 群集使用,在 与负载均衡器配合使用,以连接到不同的集群成员等。 要缓存连接,请将 设置为 。​​CachingConnectionFactory​​​​createConnection()​​​​cacheMode​​​​CacheMode.CONNECTION​

这不会限制连接数。 相反,它指定允许多少个空闲的打开连接。

从版本 1.5.5 开始,提供了一个名为的新属性。 设置此属性时,它将限制允许的连接总数。 设置后,如果达到限制,则用于等待连接变为空闲状态。 如果超过该时间,则抛出 。​​connectionLimit​​​​channelCheckoutTimeLimit​​​​AmqpTimeoutException​


当缓存模式为 时,自动声明队列和其他 (请参阅交换、队列和绑定的自动声明​)不受支持。​​CONNECTION​



此外,在撰写本文时,库默认为每个连接创建一个固定的线程池(默认大小:线程)。 使用大量连接时,应考虑在 上设置自定义。 然后,所有连接都可以使用相同的执行器,并且可以共享其线程。 执行程序的线程池应不受限制或针对预期用途进行适当设置(通常,每个连接至少一个线程)。 如果在每个连接上创建了多个通道,则池大小会影响并发性,因此变量(或简单缓存)线程池执行器将是最合适的。​​amqp-client​​​​Runtime.getRuntime().availableProcessors() * 2​​​​executor​​​​CachingConnectionFactory​


重要的是要了解缓存大小(默认情况下)不是限制,而只是可以缓存的通道数。 如果缓存大小为 10,则实际上可以使用任意数量的通道。 如果使用的通道超过 10 个,并且它们都返回到缓存中,则缓存中将有 10 个通道。 其余部分是物理关闭的。

从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。 在高容量、多线程环境中,小缓存意味着以高速率创建和关闭通道。 增加默认缓存大小可以避免此开销。 您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,并考虑进一步增加缓存大小,如果您 看到许多频道正在创建和关闭。 缓存仅按需增长(以满足应用程序的并发要求),因此此更改不会 影响现有的小批量应用程序。

从版本 1.4.2 开始,具有一个名为 的属性。 当此属性大于零时,将限制可在连接上创建的通道数。 如果达到限制,调用线程将阻塞,直到通道可用或达到此超时,在这种情况下,将抛出 a。​​CachingConnectionFactory​​​​channelCheckoutTimeout​​​​channelCacheSize​​​​AmqpTimeoutException​

框架内使用的通道(例如,)将可靠地返回到缓存中。 如果在框架外部创建通道,(例如, 通过直接访问连接并调用 ),您必须可靠地(通过关闭)返回它们,也许在一个块中,以避免通道不足。​​RabbitTemplate​​​​createChannel()​​​​finally​

以下示例演示如何创建新的 :​​connection​

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 时,配置可能类似于以下示例:

<bean 
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>

还有一个仅在框架的单元测试代码中可用的实现。 它比 更简单,因为它不缓存通道,但由于缺乏性能和弹性,它不适合在简单测试之外的实际使用。 如果出于某种原因需要实现自己的基类,基类可能会提供一个很好的起点。​​SingleConnectionFactory​​​​CachingConnectionFactory​​​​ConnectionFactory​​​​AbstractConnectionFactory​

通过使用 rabbit 命名空间可以快速方便地创建 A,如下所示:​​ConnectionFactory​

<rabbit:connection-factory />

在大多数情况下,此方法更可取,因为框架可以为您选择最佳默认值。 创建的实例是 . 请记住,通道的默认缓存大小为 25。 如果要缓存更多通道,请通过设置“channelCacheSize”属性来设置更大的值。 在 XML 中,它如下所示:​​CachingConnectionFactory​

<bean 
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

此外,使用命名空间,您可以添加“channel-cache-size”属性,如下所示:

<rabbit:connection-factory
channel-cache-size="50"/>

默认缓存模式为 ,但您可以将其配置为缓存连接。 在下面的示例中,我们使用:​​CHANNEL​​​​connection-cache-size​

<rabbit:connection-factory
cache-mode="CONNECTION" connection-cache-size="25"/>

可以使用命名空间提供主机和端口属性,如下所示:

<rabbit:connection-factory
host="somehost" port="5672"/>

或者,如果在群集环境中运行,则可以使用 addresses 属性,如下所示:

<rabbit:connection-factory
addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关 的信息,请参阅连接到群集。​​address-shuffle-mode​

以下示例具有自定义线程工厂,该工厂的线程名称前缀为 :​​rabbitmq-​

<rabbit:connection-factory  virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />

<bean class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
地址解析器

从版本 2.1.15 开始,您现在可以使用 解析连接地址。 这将覆盖 和 属性的任何设置。​​AddressResolver​​​​addresses​​​​host/port​

命名连接

从版本 1.7 开始,提供了用于注入 . 生成的名称用于目标 RabbitMQ 连接的特定于应用程序的标识。 如果 RabbitMQ 服务器支持连接名称,则连接名称将显示在管理 UI 中。 此值不必是唯一的,也不能用作连接标识符,例如,在 HTTP API 请求中。 此值应该是人类可读的,并且是键下的一部分。 您可以使用简单的 Lambda,如下所示:​​ConnectionNameStrategy​​​​AbstractionConnectionFactory​​​​ClientProperties​​​​connection_name​

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

该参数可用于通过某些逻辑区分目标连接名称。 默认情况下,的 、表示对象的十六进制字符串和内部计数器用于生成 . 命名空间组件也随属性一起提供。​​ConnectionFactory​​​​beanName​​​​AbstractConnectionFactory​​​​connection_name​​​​<rabbit:connection-factory>​​​​connection-name-strategy​

的实现将连接名称设置为应用程序属性。 可以将其声明为 并将其注入到连接工厂中,如以下示例所示:​​SimplePropertyValueConnectionNameStrategy​​​​@Bean​

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}

该属性必须存在于应用程序上下文的 .​​Environment​

使用 Spring 引导及其自动配置的连接工厂时,只需声明 . 引导自动检测 bean 并将其连接到工厂。​​ConnectionNameStrategy​​​​@Bean​

阻止的连接和资源限制

连接可能会被阻止,无法与内存警报对应的代理进行交互。 从版本 2.0 开始,可以为 提供要通知连接阻止和取消阻止事件的实例。 此外,分别通过其内部实现发出 a 和 。 这些允许您提供应用程序逻辑,以便对代理上的问题做出适当的反应,并(例如)采取一些纠正措施。​​org.springframework.amqp.rabbit.connection.Connection​​​​com.rabbitmq.client.BlockedListener​​​​AbstractConnectionFactory​​​​ConnectionBlockedEvent​​​​ConnectionUnblockedEvent​​​​BlockedListener​

当应用程序配置了单个 ,就像默认情况下使用 Spring 引导自动配置一样,当连接被代理阻止时,应用程序将停止工作。 当它被经纪人阻止时,它的任何客户都会停止工作。 如果我们在同一个应用程序中有生产者和消费者,当生产者阻止连接(因为代理上不再有资源)并且消费者无法释放它们(因为连接被阻止)时,我们最终可能会陷入死锁。 为了缓解此问题,我们建议再有一个具有相同选项的单独实例 - 一个用于生产者,一个用于使用者。 对于在使用者线程上执行的事务生成者,不可能单独,因为它们应重用与使用者事务关联的。​​CachingConnectionFactory​​​​CachingConnectionFactory​​​​CachingConnectionFactory​​​​Channel​

从版本 2.0.2 开始,具有自动使用第二个连接工厂的配置选项,除非正在使用事务。 有关详细信息,请参阅使用单独的连接。 对于发布者连接与主策略相同,并附加到调用方法的结果中。​​RabbitTemplate​​​​ConnectionNameStrategy​​​​.publisher​

从版本 1.7.7 开始,提供了 an,当无法创建 a 时会抛出该 (例如,因为达到限制并且缓存中没有可用通道)。 您可以在 中使用此异常在 进行一些退避后恢复操作。​​AmqpResourceNotAvailableException​​​​SimpleConnection.createChannel()​​​​Channel​​​​channelMax​​​​RetryPolicy​

配置基础客户端连接工厂

使用 Rabbit 客户端的实例。 在 上设置等效属性时,会传递许多配置属性(、 和 等)。 若要设置其他属性(例如),可以定义 Rabbit 工厂的实例,并使用 的相应构造函数提供对它的引用。 使用命名空间(如前所述)时,需要在属性中提供对已配置工厂的引用。 为方便起见,提供了一个工厂 Bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如下一节所述。​​CachingConnectionFactory​​​​ConnectionFactory​​​​host​​​​port​​​​userName​​​​password​​​​requestedHeartBeat​​​​connectionTimeout​​​​CachingConnectionFactory​​​​clientProperties​​​​CachingConnectionFactory​​​​connection-factory​

<rabbit:connection-factory
connection-factory="rabbitConnectionFactory"/>

默认情况下,4.0.x 客户端启用自动恢复。 虽然与此功能兼容,但Spring AMQP具有自己的恢复机制,并且通常不需要客户端恢复功能。 我们建议禁用自动恢复,以避免在代理可用但连接尚未恢复时获取实例。 您可能会注意到此异常,例如,在 中配置 时,即使故障转移到集群中的另一个代理也是如此。 由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。 从版本 1.7.1 开始,Spring AMQP 禁用自动恢复,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给 . 默认情况下,由 创建的 RabbitMQ 实例还禁用了该选项。​​amqp-client​​​​AutoRecoverConnectionNotCurrentlyOpenException​​​​RetryTemplate​​​​RabbitTemplate​​​​amqp-client​​​​CachingConnectionFactory​​​​ConnectionFactory​​​​RabbitConnectionFactoryBean​

​RabbitConnectionFactoryBean​​和配置 SSL

从版本 1.4 开始,提供了方便的功能,以便使用依赖关系注入在基础客户端连接工厂上方便地配置 SSL 属性。 其他二传手委托给底层工厂。 以前,您必须以编程方式配置 SSL 选项。 以下示例演示如何配置:​​RabbitConnectionFactoryBean​​​​RabbitConnectionFactoryBean​

<rabbit:connection-factory 
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />

<bean
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>

有关配置 SSL 的信息,请参阅 RabbitMQ 文档。 省略 和 配置以在不进行证书验证的情况下通过 SSL 进行连接。 下一个示例演示如何提供密钥和信任存储区配置。​​keyStore​​​​trustStore​

该属性是一个 Spring 指向包含以下键的属性文件:​​sslPropertiesLocation​​​​Resource​

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

和是春天指着商店。 通常,此属性文件由操作系统保护,应用程序具有读取访问权限。​​keyStore​​​​truststore​​​​Resources​

从 Spring AMQP 版本 1.5 开始,您可以直接在工厂 Bean 上设置这些属性。 如果同时提供了离散属性 和,则后者中的属性将覆盖 离散值。​​sslPropertiesLocation​

从版本 2.0 开始,默认情况下会验证服务器证书,因为它更安全。 如果出于某种原因希望跳过此验证,请将工厂 Bean 的属性设置为 。 从版本 2.1 开始,现在默认调用。 要恢复到以前的行为,请将该属性设置为 。​​skipServerCertificateValidation​​​​true​​​​RabbitConnectionFactoryBean​​​​enableHostnameVerification()​​​​enableHostnameVerification​​​​false​

从版本 2.2.5 开始,工厂 Bean 将始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。 如果由于某种原因需要使用 v1.1,请设置属性:。​​sslAlgorithm​​​​setSslAlgorithm("TLSv1.1")​

连接到群集

要连接到群集,请在 上配置属性:​​addresses​​​​CachingConnectionFactory​

@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}

从版本 3.0 开始,每当建立新连接时,基础连接工厂将通过选择随机地址来尝试连接到主机。 若要恢复到以前尝试从第一个连接到最后一个的行为,请将该属性设置为 。​​addressShuffleMode​​​​AddressShuffleMode.NONE​

从版本 2.3 开始,添加了随机模式,这意味着在创建连接后,第一个地址将移动到末尾。 如果您希望从所有节点上的所有分片消费,您可能希望将此模式与 RabbitMQ 分片插件一起使用,并具有合适的并发性。​​INORDER​​​​CacheMode.CONNECTION​

@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂

从 1.3 版开始,引入了 。 此工厂提供了一种机制,用于为多个映射配置映射,并在运行时确定某些映射的目标。 通常,实现会检查线程绑定上下文。 为方便起见,Spring AMQP 提供了 ,它从 获取当前线程绑定。 以下示例显示了如何在 XML 和 Java 中配置 a:​​AbstractRoutingConnectionFactory​​​​ConnectionFactories​​​​ConnectionFactory​​​​lookupKey​​​​SimpleRoutingConnectionFactory​​​​lookupKey​​​​SimpleResourceHolder​​​​SimpleRoutingConnectionFactory​

<bean 
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>

<rabbit:template connection-factory="connectionFactory" />
public class MyService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}

}

使用后取消绑定资源非常重要。 有关更多信息,请参见 的 JavaDoc 。​​AbstractRoutingConnectionFactory​

从版本 1.4 开始,支持 SpEL 和属性,它们在每个 AMQP 协议交互操作(、、或)上进行评估,解析为所提供的值。 您可以使用 Bean 引用,例如在表达式中。 对于操作,要发送的消息是根评估对象。 对于操作,是根评估对象。​​RabbitTemplate​​​​sendConnectionFactorySelectorExpression​​​​receiveConnectionFactorySelectorExpression​​​​send​​​​sendAndReceive​​​​receive​​​​receiveAndReply​​​​lookupKey​​​​AbstractRoutingConnectionFactory​​​​@vHostResolver.getVHost(#root)​​​​send​​​​receive​​​​queueName​

路由算法如下:如果选择器表达式被计算或被计算到,或者提供的不是 的实例,则一切都像以前一样工作,依赖于提供的实现。 如果评估结果不是 ,但没有目标,并且 配置了 。 在 的情况下,它确实回退到基于 的实现。 但是,如果抛出 ,则抛出 an。​​null​​​​null​​​​ConnectionFactory​​​​AbstractRoutingConnectionFactory​​​​ConnectionFactory​​​​null​​​​ConnectionFactory​​​​lookupKey​​​​AbstractRoutingConnectionFactory​​​​lenientFallback = true​​​​AbstractRoutingConnectionFactory​​​​routing​​​​determineCurrentLookupKey()​​​​lenientFallback = false​​​​IllegalStateException​

命名空间支持还提供组件上的 and 属性。​​send-connection-factory-selector-expression​​​​receive-connection-factory-selector-expression​​​​<rabbit:template>​

此外,从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。 在这种情况下,队列名称列表将用作查找键。 例如,如果使用 配置容器,则查找键为 (请注意,键中没有空格)。​​setQueueNames("thing1", "thing2")​​​​[thing1,thing]"​

从版本 1.6.9 开始,可以通过在侦听器容器上使用向查找键添加限定符。 例如,这样做可以侦听具有相同名称但位于不同虚拟主机中的队列(每个虚拟主机都有一个连接工厂)。​​setLookupKeyQualifier​

例如,对于查找键限定符和侦听队列的容器,您可以向其注册目标连接工厂的查找键可以是 。​​thing1​​​​thing2​​​​thing1[thing2]​

目标(和默认,如果提供)连接工厂必须具有相同的发布者确认和返回设置。 请参阅发布者确认并返回。

从版本 2.4.4 开始,可以禁用此验证。 如果确认和返回之间的值需要不相等,则可以使用 来关闭验证。 请注意,添加到的第一个连接工厂将确定 和 的一般值。​​AbstractRoutingConnectionFactory#setConsistentConfirmsReturns​​​​AbstractRoutingConnectionFactory​​​​confirms​​​​returns​

如果您遇到要检查的某些消息确认/返回而其他消息则不会确认/返回的情况,这可能会很有用。 例如:

@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);

final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

这样,带有标头的消息将通过缓存连接发送,您可以确保消息传递。 有关确保邮件传递的详细信息,请参阅发布者确认并返回。​​x-use-publisher-confirms: true​

队列相关性和​​LocalizedQueueConnectionFactory​

在集群中使用 HA 队列时,为了获得最佳性能,您可能需要连接到物理代理 潜在顾客队列所在的位置。 可以配置多个代理地址。 这是为了进行故障转移,客户端会尝试按顺序进行连接。 使用管理插件提供的 REST API 来确定哪个节点是队列的引导节点。 然后,它创建(或从缓存中检索)仅连接到该节点的节点。 如果连接失败,则确定新的引导节点,使用者连接到该节点。 配置了默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到群集。​​CachingConnectionFactory​​​​LocalizedQueueConnectionFactory​​​​CachingConnectionFactory​​​​LocalizedQueueConnectionFactory​

是 和 使用队列名称作为查找键,如上面的路由连接工厂中所述。​​LocalizedQueueConnectionFactory​​​​RoutingConnectionFactory​​​​SimpleMessageListenerContainer​

因此(使用队列名称进行查找),仅当容器配置为侦听单个队列时,才能使用 。​​LocalizedQueueConnectionFactory​

必须在每个节点上启用 RabbitMQ 管理插件。

此连接工厂适用于长期连接,例如 使用的 . 它不适用于短连接用途,例如使用 a,因为在建立连接之前调用 REST API 会产生开销。 此外,对于发布操作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑几乎没有价值。​​SimpleMessageListenerContainer​​​​RabbitTemplate​

以下示例配置显示了如何配置工厂:

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}

请注意,前三个参数是 、 和 的数组。 这些是位置性的,因为当容器尝试连接到队列时,它会使用管理 API 来确定哪个节点是队列的引导节点,并连接到与该节点位于同一阵列位置的地址。​​addresses​​​​adminUris​​​​nodes​


从版本 3.0 开始,RabbitMQ 不再用于访问 Rest API。 相反,默认情况下,如果使用 from Spring Webflux 在类路径上;否则使用 a。​​http-client​​​​WebClient​​​​spring-webflux​​​​RestTemplate​

添加到类路径:​​WebFlux​

例 1.马文

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>

例 2.格拉德尔

compile 'org.springframework.amqp:spring-rabbit'

您还可以通过实现和重写其 和(可选)方法来使用其他 REST 技术。​​LocalizedQueueConnectionFactory.NodeLocator​​​​createClient, ``restCall​​​​close​

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

@Override
public MyClient createClient(String userName, String password) {
...
}

@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});

});

该框架提供 和 ,默认值如上所述。​​WebFluxNodeLocator​​​​RestTemplateNodeLocator​

发布商确认并返回

通过将属性设置为“true”和“true”来支持已确认(具有相关性)和返回的消息。​​CachingConnectionFactory​​​​publisherConfirmType​​​​ConfirmType.CORRELATED​​​​publisherReturns​

设置这些选项后,工厂创建的实例将包装在 中,用于方便回调。 当获得这样的通道时,客户端可以向 . 该实现包含用于路由确认或返回到相应侦听器的逻辑。 以下各节将进一步介绍这些功能。​​Channel​​​​PublisherCallbackChannel​​​​PublisherCallbackChannel.Listener​​​​Channel​​​​PublisherCallbackChannel​

另请参阅​​作用域内操作​​。​​simplePublisherConfirms​

有关更多背景信息,请参阅 RabbitMQ 团队的博客文章,标题为“介绍发布者确认”。

连接和通道侦听器

连接工厂支持注册和实现。 这允许您接收连接和通道相关事件的通知。 (建立连接时 使用 A 执行声明 - 有关详细信息,请参阅交换、队列和绑定的自动声明)。 以下清单显示了接口定义:​​ConnectionListener​​​​ChannelListener​​​​ConnectionListener​​​​RabbitAdmin​​​​ConnectionListener​

@FunctionalInterface
public interface ConnectionListener {

void onCreate(Connection connection);

default void onClose(Connection connection) {
}

default void onShutDown(ShutdownSignalException signal) {
}

}

从版本 2.0 开始,可以为对象提供实例,以便在连接阻止和取消阻止事件时收到通知。 以下示例显示了通道侦听器接口定义:​​org.springframework.amqp.rabbit.connection.Connection​​​​com.rabbitmq.client.BlockedListener​

@FunctionalInterface
public interface ChannelListener {

void onCreate(Channel channel, boolean transactional);

default void onShutDown(ShutdownSignalException signal) {
}

}

请参阅发布是异步的 — 如何检测成功和失败,了解您可能希望注册 的一种方案。​​ChannelListener​

记录通道关闭事件

版本 1.5 引入了一种机制,使用户能够控制日志记录级别。

使用默认策略记录通道关闭,如下所示:​​CachingConnectionFactory​

  • 不记录正常通道关闭 (200 OK)。
  • 如果通道由于被动队列声明失败而关闭,则会在调试级别记录该通道。
  • 如果频道因非特定消费者状况而被拒绝而关闭,则会记录在 信息级别。basic.consume
  • 所有其他记录在错误级别。

若要修改此行为,可以将自定义注入到 in 属性中。​​ConditionalExceptionLogger​​​​CachingConnectionFactory​​​​closeExceptionLogger​

另请参阅使用者事件。

运行时缓存属性

与 1.6 版本相同,现在通过该方法提供缓存统计信息。 这些统计信息可用于调整缓存,以在生产中对其进行优化。 例如,高水位线可用于确定是否应增加缓存大小。 如果它等于缓存大小,则可能需要考虑进一步增加。 下表描述了这些属性:​​CachingConnectionFactory​​​​getCacheProperties()​​​​CacheMode.CHANNEL​

表 1.CacheMode.CHANNEL 的缓存属性

财产

意义


connectionName


由 生成的连接的名称。​​ConnectionNameStrategy​


channelCacheSize


当前配置的允许空闲的最大通道数。


localPort


连接的本地端口(如果可用)。 这可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。


idleChannelsTx


当前处于空闲(缓存)状态的事务通道数。


idleChannelsNotTx


当前处于空闲(缓存)状态的非事务性通道数。


idleChannelsTxHighWater


并发空闲(缓存)的最大事务通道数。


idleChannelsNotTxHighWater


非事务性通道的最大数量已同时处于空闲状态(缓存)。

下表描述了这些属性:​​CacheMode.CONNECTION​

表 2.CacheMode.CONNECTION 的缓存属性

财产

意义


connectionName:<localPort>


由 生成的连接的名称。​​ConnectionNameStrategy​


openConnections


表示与代理的连接的连接对象的数量。


channelCacheSize


当前配置的允许空闲的最大通道数。


connectionCacheSize


当前配置的允许空闲的最大连接数。


idleConnections


当前处于空闲状态的连接数。


idleConnectionsHighWater


并发空闲的最大连接数。


idleChannelsTx:<localPort>


此连接当前处于空闲(缓存)状态的事务通道数。 您可以使用属性名称的一部分与 RabbitMQ 管理 UI 上的连接和通道相关联。​​localPort​


idleChannelsNotTx:<localPort>


此连接当前处于空闲(缓存)状态的非事务性通道数。 属性名称的一部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。​​localPort​


idleChannelsTxHighWater:<localPort>


并发空闲(缓存)的最大事务通道数。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。


idleChannelsNotTxHighWater:<localPort>


非事务性通道的最大数量已同时处于空闲状态(缓存)。 您可以使用属性名称的一部分与 RabbitMQ 管理 UI 上的连接和通道相关联。​​localPort​

属性(或)也包括在内。​​cacheMode​​​​CHANNEL​​​​CONNECTION​

Spring AMQP项目

图1.JVisualVM 示例

RabbitMQ 自动连接/拓扑恢复

自Spring AMQP的第一个版本以来,该框架在代理发生故障时提供了自己的连接和通道恢复。 此外,如配置代理中所述,在重新建立连接时,会重新声明任何基础结构 Bean(队列和其他)。 因此,它不依赖于库现在提供的自动恢复。 默认情况下,已启用自动恢复。 两种恢复机制之间存在一些不兼容之处,因此默认情况下,Spring 将底层的属性设置为 。 即使该属性是 ,Spring 也会通过立即关闭任何恢复的连接来有效地禁用它。​​RabbitAdmin​​​​amqp-client​​​​amqp-client​​​​automaticRecoveryEnabled​​​​RabbitMQ connectionFactory​​​​false​​​​true​

缺省情况下,只有定义为 bean 的元素(队列、交换、绑定)才会在连接失败后重新声明。 有关如何更改该行为的信息,请参阅恢复自动删除声明。

4.1.3. 添加自定义客户端连接属性

现在允许您访问基础连接工厂以允许,例如, 设置自定义客户端属性。 以下示例演示如何执行此操作:​​CachingConnectionFactory​

connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");

查看连接时,这些属性将显示在 RabbitMQ 管理 UI 中。

4.1.4.​​AmqpTemplate​

与 Spring 框架和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个发挥核心作用的“模板”。 定义主操作的接口称为 。 这些操作涵盖了发送和接收消息的一般行为。 换句话说,它们不是任何实现所独有的——因此名称中的“AMQP”。 另一方面,该接口的实现与AMQP协议的实现相关联。 与本身是接口级 API 的 JMS 不同,AMQP 是一种线级协议。 该协议的实现提供自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。 目前,只有一个实现:。 在下面的示例中,我们经常使用 . 但是,当您查看配置示例或实例化模板或调用资源库的任何代码摘录时,您可以看到实现类型(例如,)。​​AmqpTemplate​​​​RabbitTemplate​​​​AmqpTemplate​​​​RabbitTemplate​

如前所述,该接口定义了发送和接收消息的所有基本操作。 我们将分别在发送消息和接收消息中探讨消息发送和接收。​​AmqpTemplate​

另请参阅异步兔子模板。

添加重试功能

从版本 1.3 开始,您现在可以将 配置为使用 来帮助处理代理连接问题。 有关完整信息,请参阅春季重试项目。 以下只是一个使用指数退避策略和默认值的示例,该策略在将异常抛给调用方之前进行了三次尝试。​​RabbitTemplate​​​​RetryTemplate​​​​SimpleRetryPolicy​

下面的示例使用 XML 命名空间:

<rabbit:template  connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>

以下示例使用 Java 中的注释:​​@Configuration​

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}

从版本 1.4 开始,除了属性之外,该选项在 上受支持。 它用作 的第二个参数。​​retryTemplate​​​​recoveryCallback​​​​RabbitTemplate​​​​RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)​

这在一定程度上受到限制,因为重试上下文仅包含字段。 对于更复杂的用例,您应该使用外部,以便可以通过上下文的属性将其他信息传达给 。 以下示例演示如何执行此操作:​​RecoveryCallback​​​​lastThrowable​​​​RetryTemplate​​​​RecoveryCallback​

retryTemplate.execute(
new RetryCallback<Object, Exception>() {

@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}, new RecoveryCallback<Object>() {

@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}

在这种情况下,您不会将 注入 .​​RetryTemplate​​​​RabbitTemplate​

发布是异步的 — 如何检测成功和失败

发布消息是一种异步机制,默认情况下,RabbitMQ 会丢弃无法路由的消息。 若要成功发布,可以接收异步确认,如相关发布者确认和返回中所述。 请考虑两种故障情况:

  • 发布到交易所,但没有匹配的目标队列。
  • 发布到不存在的交易所。

第一种情况包含在发布商退货中,如相关发布商确认和退货中所述。

对于第二种情况,将丢弃消息,并且不会生成任何返回。 基础通道已关闭,但出现异常。 默认情况下,会记录此异常,但您可以向 注册 以获取此类事件的通知。 以下示例演示如何添加:​​ChannelListener​​​​CachingConnectionFactory​​​​ConnectionListener​

this.connectionFactory.addConnectionListener(new ConnectionListener() {

@Override
public void onCreate(Connection connection) {
}

@Override
public void onShutDown(ShutdownSignalException signal) {
...
}

});

您可以检查信号的属性以确定发生的问题。​​reason​

要检测发送线程上的异常,可以在 上检测到异常。 但是,事务会显著降低性能,因此在仅为这一个用例启用事务之前,请仔细考虑这一点。​​setChannelTransacted(true)​​​​RabbitTemplate​​​​txCommit()​

相关发布者确认并返回

支持发布者的实现确认并返回。​​RabbitTemplate​​​​AmqpTemplate​

对于返回的消息,模板的属性必须设置为或必须为特定消息计算。 此功能需要将其属性设置为 (请参阅发布者确认并返回)。 返回由客户端通过调用注册 发送到客户端。 回调必须实现以下方法:​​mandatory​​​​true​​​​mandatory-expression​​​​true​​​​CachingConnectionFactory​​​​publisherReturns​​​​true​​​​RabbitTemplate.ReturnsCallback​​​​setReturnsCallback(ReturnsCallback callback)​

void returnedMessage(ReturnedMessage returned);

具有以下属性:​​ReturnedMessage​

  • ​message​​- 返回的消息本身
  • ​replyCode​​- 指示退货原因的代码
  • ​replyText​​- 返回的文本原因 - 例如NO_ROUTE
  • ​exchange​​- 消息发送到的交易所
  • ​routingKey​​- 使用的路由密钥

每个 仅支持一个 。 另请参阅回复超时。​​ReturnsCallback​​​​RabbitTemplate​

对于发布者确认(也称为发布者确认),模板需要 将其属性设置为 。 确认通过它通过调用注册 发送到客户端。 回调必须实现此方法:​​CachingConnectionFactory​​​​publisherConfirm​​​​ConfirmType.CORRELATED​​​​RabbitTemplate.ConfirmCallback​​​​setConfirmCallback(ConfirmCallback callback)​

void confirm(CorrelationData correlationData, boolean ack, String cause);

是客户端在发送原始消息时提供的对象。 对于 为 true,对于 为 false。 例如,如果生成 时可用,则原因可能包含 的原因。 例如,将消息发送到不存在的交易所时。 在这种情况下,经纪人关闭通道。 关闭的原因包含在 . 在 1.4 版中添加了。​​CorrelationData​​​​ack​​​​ack​​​​nack​​​​nack​​​​nack​​​​nack​​​​cause​​​​cause​

只有一个受 .​​ConfirmCallback​​​​RabbitTemplate​

当兔子模板发送操作完成时,通道将关闭。 这排除了在连接工厂缓存已满时接收确认或返回(当缓存中有空间时,通道未物理关闭,返回和确认正常进行)。 当缓存已满时,框架将关闭延迟最多五秒钟,以便有时间接收确认和返回。 使用确认时,收到最后一次确认时通道将关闭。 仅使用回车时,通道将保持打开状态整整五秒钟。 通常建议将连接工厂的值设置为足够大的值,以便将发布消息的通道返回到缓存而不是关闭。 您可以使用 RabbitMQ 管理插件监控通道使用情况。 如果您看到通道正在快速打开和关闭,则应考虑增加缓存大小以减少服务器上的开销。​​channelCacheSize​

在版本 2.1 之前,为发布者确认启用的通道在收到确认之前返回到缓存。 其他一些进程可以签出通道并执行一些导致通道关闭的操作,例如将消息发布到不存在的交换。 这可能会导致确认丢失。 版本 2.1 及更高版本在确认未完成时不再将通道返回到缓存。 在每次操作后对通道执行逻辑。 通常,这意味着一次只有一个确认未完成。​​RabbitTemplate​​​​close()​

从版本 2.2 开始,将在连接工厂的某个线程上调用回调。 这是为了避免在回调中执行 Rabbit 操作时出现潜在的死锁。 对于以前的版本,回调直接在连接 I/O 线程上调用;如果执行某些 RPC 操作(例如打开新通道),这将死锁,因为 I/O 线程会阻塞等待结果,但结果需要由 I/O 线程本身处理。 对于这些版本,有必要将工作(例如发送消息)移交给回调中的另一个线程。 这不再是必需的,因为框架现在将回调调用传递给执行器。​​executor​​​​amqp-client​

只要返回回调在 60 秒或更短的时间内执行,仍然保持在 ack 之前接收返回消息的保证。 确认计划在返回回调退出后或 60 秒后(以先到者为准)传递。

该对象具有可用于获取结果的 ,而不是在模板上使用 。 以下示例演示如何配置实例:​​CorrelationData​​​​CompletableFuture​​​​ConfirmCallback​​​​CorrelationData​

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

由于它是 ,因此您可以在准备就绪时将结果用于异步回调。 该对象是具有 2 个属性的简单 bean:和(例如实例)。 不会为代理生成的实例填充原因。 它是为框架生成的实例填充的(例如,在实例未完成时关闭连接)。​​CompletableFuture<Confirm>​​​​get()​​​​whenComplete()​​​​Confirm​​​​ack​​​​reason​​​​nack​​​​nack​​​​nack​​​​ack​

此外,当同时启用确认和返回时,将填充返回的消息,只要 具有唯一的 ;默认情况下,从版本 2.3 开始,情况始终如此。 保证在将来使用 .​​CorrelationData​​​​CorrelationData​​​​id​​​​ack​

另请参阅作用域内操作,了解等待发布者确认的更简单机制。

作用域内操作

通常,使用模板时,会将 从缓存中签出(或创建),用于操作,然后返回到缓存中以供重用。 在多线程环境中,无法保证下一个操作使用相同的通道。 但是,有时您可能希望更好地控制通道的使用,并确保在同一通道上执行所有操作。​​Channel​

从版本 2.0 开始,提供了一个名为的新方法,该方法带有 . 在回调范围内和提供的参数上执行的任何操作都使用相同的专用 ,该 将在末尾关闭(不返回到缓存)。 如果通道是 ,则在收到所有确认后,它将返回到缓存中(请参阅相关发布服务器确认和返回)。​​invoke​​​​OperationsCallback​​​​RabbitOperations​​​​Channel​​​​PublisherCallbackChannel​

@FunctionalInterface
public interface OperationsCallback<T> {

T doInRabbit(RabbitOperations operations);

}

为什么可能需要这样做的一个示例是,如果您希望在底层 . Spring API 以前没有公开此方法,因为如前所述,通道通常是缓存和共享的。 现在提供 和 ,委托给 范围内使用的专用通道。 出于显而易见的原因,这些方法不能在该范围之外使用。​​waitForConfirms()​​​​Channel​​​​RabbitTemplate​​​​waitForConfirms(long timeout)​​​​waitForConfirmsOrDie(long timeout)​​​​OperationsCallback​

请注意,其他地方提供了允许您将确认与请求相关联的更高级别的抽象(请参阅相关发布者确认和返回)。 如果只想等到代理确认交割,则可以使用以下示例中显示的技术:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});

如果您希望在 范围内的同一通道上调用操作,则必须使用用于操作的相同通道构造 admin。​​RabbitAdmin​​​​OperationsCallback​​​​RabbitTemplate​​​​invoke​

如果模板操作已在现有事务范围内执行,则前面的讨论没有实际意义,例如,在事务处理侦听器容器线程上运行并在事务处理模板上执行操作时。 在这种情况下,操作在该通道上执行,并在线程返回到容器时提交。 在这种情况下没有必要使用。​​invoke​

以这种方式使用 consure 时,实际上并不需要为将确认与请求相关联而设置的大部分基础结构(除非还启用了返回)。 从版本 2.2 开始,连接工厂支持名为 的新属性。 如果设置为 ,则可以避免基础结构,并且可以更有效地进行确认处理。​​publisherConfirmType​​​​ConfirmType.SIMPLE​

此外,在发送的消息中设置属性。 如果要检查(或记录或以其他方式使用)特定确认,可以使用重载方法执行此操作,如以下示例所示:​​RabbitTemplate​​​​publisherSequenceNumber​​​​MessageProperties​​​​invoke​

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);

这些对象(for 和实例)是 Rabbit 客户端回调,而不是模板回调。​​ConfirmCallback​​​​ack​​​​nack​

以下示例日志和实例:​​ack​​​​nack​

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));

作用域内操作绑定到线程。 有关多线程环境中严格排序的讨论,请参阅多线程环境中的严格消息排序。

多线程环境中的严格消息排序

作用域内操作中的讨论仅适用于在同一线程上执行操作的情况。

请考虑以下情况:

  • ​thread-1​​将消息发送到队列并将工作交给thread-2
  • ​thread-2​​将消息发送到同一队列

由于 RabbitMQ 的异步性质和缓存通道的使用;不确定是否将使用相同的通道,因此无法保证消息到达队列的顺序。 (在大多数情况下,它们会按顺序到达,但无序交付的概率不为零)。 要解决此用例,您可以使用具有大小的有界通道缓存(与 一起)来确保消息始终发布在同一通道上,并保证顺序。 为此,如果连接工厂有其他用途(如使用者),则应为模板使用专用连接工厂,或将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅使用单独的连接)。​​1​​​​channelCheckoutTimeout​

这最好用一个简单的 Spring 引导应用程序来说明:

@SpringBootApplication
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}

@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}

@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}

@Bean
Queue queue() {
return new Queue("queue");
}


@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}

}

@Component
class Service {

private static final Logger LOG = LoggerFactory.getLogger(Service.class);

private final RabbitTemplate template;

private final TaskExecutor exec;

Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}

void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}

void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}

}

即使发布是在两个不同的线程上执行的,它们也将使用相同的通道,因为缓存的上限为单个通道。

从版本 2.3.7 开始,支持 使用 and 方法将线程的通道传输到另一个线程。 第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。 线程可以绑定非事务通道或事务通道(或两者之一);除非使用两个连接工厂,否则不能单独传输它们。 示例如下:​​ThreadChannelConnectionFactory​​​​prepareContextSwitch​​​​switchContext​

@SpringBootApplication
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}

@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}

@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}

@Bean
Queue queue() {
return new Queue("queue");
}


@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}

}

@Component
class Service {

private static final Logger LOG = LoggerFactory.getLogger(Service.class);

private final RabbitTemplate template;

private final TaskExecutor exec;

private final ThreadChannelConnectionFactory connFactory;

Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {

this.template = template;
this.exec = exec;
this.connFactory = tccf;
}

void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}

void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}

}

调用 后,如果当前线程执行更多操作,它们将在新通道上执行。 当不再需要线程绑定通道时,关闭它非常重要。​​prepareSwitchContext​

消息传递集成

从 1.4 版开始,(建立在 之上)提供了与 Spring 框架消息抽象的集成,即 . 这允许您使用抽象发送和接收消息。 这种抽象被其他Spring项目使用,例如Spring Integration和Spring的STOMP支持。 涉及两个消息转换器:一个用于在 Spring 消息传递和 Spring AMQP 的抽象之间进行转换,另一个用于在 Spring AMQP 的抽象和底层 RabbitMQ 客户端库所需的格式之间进行转换。 默认情况下,消息负载由提供实例的消息转换器转换。 或者,您可以使用其他一些有效负载转换器注入自定义,如以下示例所示:​​RabbitMessagingTemplate​​​​RabbitTemplate​​​​org.springframework.messaging.Message​​​​spring-messaging​​​​Message<?>​​​​Message<?>​​​​Message​​​​Message​​​​RabbitTemplate​​​​MessagingMessageConverter​

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
已验证的用户标识

从版本 1.6 开始,该模板现在支持 ( 使用 Java 配置时)。 如果发送了消息,那么在计算此表达式后将设置用户 id 属性(如果尚未设置)。 评估的根对象是要发送的消息。​​user-id-expression​​​​userIdExpression​

以下示例演示如何使用该属性:​​user-id-expression​

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一个示例是文字表达式。 第二个从应用程序上下文中的连接工厂 Bean 获取属性。​​username​

使用单独的连接

从版本 2.0.2 开始,可以将属性设置为尽可能使用与侦听器容器使用的连接不同的连接。 这是为了避免当生产者因任何原因被阻止时,消费者被阻止。 为此,连接工厂维护第二个内部连接工厂;默认情况下,它与主工厂的类型相同,但如果希望使用不同的工厂类型进行发布,则可以显式设置。 如果 rabbit 模板在侦听器容器启动的事务中运行,则无论此设置如何,都将使用该容器的通道。​​usePublisherConnection​​​​true​

通常,不应将此设置为 的模板使用 。 使用采用连接工厂的构造函数。 如果使用采用模板的其他构造函数,请确保模板的属性为 。 这是因为,管理员通常用于声明侦听器容器的队列。 使用属性设置为 的模板意味着独占队列(如 )将在与侦听器容器使用的连接不同的连接上声明。 在这种情况下,容器无法使用队列。​​RabbitAdmin​​​​true​​​​RabbitAdmin​​​​false​​​​true​​​​AnonymousQueue​