一、简介
RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它用于异步通信、解耦系统,提高系统的可扩展性和可靠性。它广泛应用于微服务架构、分布式系统、异步处理等场景。
1.RabbitMQ 的特点
- 支持多种协议:支持 AMQP 0.9.1、AMQP 1.0、MQTT、STOMP 等。
- 高可用性:支持集群部署、镜像队列等机制,确保消息不丢失。
- 可靠性:通过持久化、ACK 机制、事务支持等方式保证消息可靠传输。
- 灵活的路由机制:通过交换机(Exchange)和绑定(Binding)策略,实现复杂的消息路由规则。
- 插件机制:提供插件扩展,如管理插件、消息追踪插件、WebSocket 支持等。
- 轻量级、易于部署:基于 Erlang 语言开发,占用资源较低,支持 Docker、Kubernetes 部署。
2.RabbitMQ 的优缺点
(1)优点:
- 支持多种消息模式(点对点、发布订阅、路由等)。
- 社区活跃,生态丰富,官方提供大量插件,支持多种语言客户端(Java、Python、Go 等)。
- 支持消息持久化,即使服务器重启,消息也不会丢失。
- 支持集群模式,可扩展性强,适用于分布式环境。
- 性能较好,适用于中小型业务场景。
(2)缺点:
- 吞吐量较低:相比 Kafka,RabbitMQ 的吞吐量较低,不适合超大规模数据流处理。
- 管理运维复杂:需要手动管理队列、交换机、绑定等,运维成本相对较高。
- Erlang 生态较小,部分企业的运维人员对 Erlang 语言不熟悉,可能影响问题排查。
- 资源占用较大:在高并发场景下,RabbitMQ 的内存占用较多,可能影响性能。
二、核心组件
1. 生产者(Producer)
- 负责发送消息到 RabbitMQ,消息最终会进入队列(Queue)。
- 生产者不会直接把消息发到队列,而是先发送到交换机(Exchange)。
2. 交换机(Exchange)
- 负责接收生产者发送的消息,并根据绑定规则决定消息流向。
- 交换机类型:
-
Direct(直连交换机):消息按照指定的
routing key
发送到匹配的队列。 -
Fanout(扇形交换机):消息广播到所有绑定的队列,不考虑
routing key
。 -
Topic(主题交换机):按模式匹配
routing key
,适用于模糊匹配的场景(如logs.*
)。 - Headers(头交换机):根据消息的 Header 属性进行路由。
-
Direct(直连交换机):消息按照指定的
3. 队列(Queue)
- 存储消息,等待消费者(Consumer)消费。
- 可配置是否持久化(避免 RabbitMQ 重启后消息丢失)。
- 可设置死信队列(DLX),当消息超时未消费或被拒绝时,进入死信队列做后续处理。
4. 绑定(Binding)
- 连接交换机和队列的桥梁,决定消息如何流转。
- 绑定通常通过
routing key
进行匹配,或基于headers
进行匹配。
5. 消费者(Consumer)
- 从队列中获取消息并进行处理。
- 支持手动 ACK 机制(保证消息消费后才被确认,避免丢失)。
6. 虚拟主机(Virtual Host,VHost)
- RabbitMQ 服务器的逻辑隔离机制,不同 VHost 之间的消息互不影响。
- 每个 VHost 维护自己的队列、交换机和权限控制。
7. RabbitMQ 连接(Connection)和信道(Channel)
- Connection:TCP 连接,生产者和消费者都需要建立连接。
-
Channel:RabbitMQ 的逻辑连接,多个
Channel
共享一个Connection
,减少 TCP 连接开销。
三、交换机类型
RabbitMQ 中的交换机(Exchange)是消息路由的核心,决定了消息的流向。根据不同的路由策略,RabbitMQ 提供了几种交换机类型:
1. Direct Exchange(直连交换机)
-
特点:消息通过交换机按
routing key
路由到匹配的队列。 - 适用场景:精确匹配的消息路由。
在 Direct Exchange 中,生产者发送消息时指定 routing key
,只有与队列绑定时的 routing key
完全匹配的队列会接收到消息。
2. Fanout Exchange(扇形交换机)
-
特点:不关心
routing key
,将消息广播到所有绑定的队列。 - 适用场景:需要将消息发送给多个消费者或多个队列的场景(例如广播通知)。
Fanout Exchange 将消息发送到所有绑定的队列,完全不考虑 routing key
。
3. Topic Exchange(主题交换机)
-
特点:使用
routing key
的模式匹配功能,支持通配符(*
和#
)来路由消息。 -
适用场景:根据模式灵活路由消息,比如日志系统中的级别过滤(如
logs.info
,logs.error
)。
Topic Exchange 根据绑定的 routing key
模式进行匹配,*
匹配一个词,#
匹配多个词。比如 logs.info
匹配所有包含 logs.info
的消息。
4. Headers Exchange(头交换机)
-
特点:通过匹配消息的 Header 信息进行路由,而不是
routing key
。 - 适用场景:需要通过多个属性来过滤消息的情况,比如消息头包含多个属性,灵活的路由机制。
Headers Exchange 根据消息的 header 信息来路由,常用于需要灵活多维度匹配的场景。
总结
- Direct Exchange:用于精确匹配的场景。
- Fanout Exchange:用于广播场景,所有绑定的队列都会接收到消息。
- Topic Exchange:用于模式匹配,灵活路由。
- Headers Exchange:通过 header 属性进行路由,灵活多维度匹配。
四、创建方式
在 RabbitMQ 中,队列(Queue)、交换机(Exchange)、和绑定(Binding)是消息传递的核心元素。下面我会介绍它们在 Spring Boot 中的创建方式,包括使用 配置类 和 注解方式。
1.配置类方式
在配置类中,可以通过 @Bean
注解来声明队列、交换机和绑定关系。
@Configuration
public class RabbitConfig {
// 创建队列
@Bean
public Queue queue() {
return new Queue("myQueue", true);
}
// 创建 Direct 交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
// 创建绑定
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("myRoutingKey");
}
}
-
队列(Queue):可以使用
@Bean
在配置类中声明。 -
交换机(Exchange):同样使用
@Bean
创建,通常使用DirectExchange
、FanoutExchange
、TopicExchange
等不同类型。 -
绑定(Binding):队列和交换机的绑定通过
BindingBuilder.bind(queue).to(exchange).with(routingKey)
来配置。
2.注解方式
在 Spring Boot 中,@RabbitListener
注解可以用来监听 RabbitMQ 队列,并且可以通过 @QueueBinding
直接在注解中完成 队列、交换机、绑定 的声明(如果不存在就会创建)。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "coupon.order.queue1", durable = "true"), // 队列
exchange = @Exchange(name = "coupon.order.exchange", type = ExchangeTypes.DIRECT, durable = "true"), // 交换机
key = {"coupon"} // 绑定的 routing key
))
public void receiveMessage(String message) {
System.out.println("Received: " + message);
}
注意:RabbitMQ 默认发送和接收的消息是 字节数组(byte[]),在 Spring Boot 中,默认是字符串,但如果发送的是对象,需要手动配置 JSON 序列化 才能正确转换。需要在生产者和消费者都添加Bean注解:
@Bean
public MessageConverter jacksonMessageConvertor() {
return new Jackson2JsonMessageConverter();
}
五、生产者确认机制
RabbitMQ 生产者确认机制(Publisher Confirms)用于确保消息从生产者 正确投递到交换机,并且 从交换机正确路由到队列,防止消息丢失。
1. 生产者确认机制的三种模式
首先配置.yml文件或.properties文件:
spring:
rabbitmq:
# 生产者网络连接失败重试设置,不建议使用
template:
retry:
enabled: true # 开启消费者重试(抛出异常时触发)
initial-interval: 400ms # 首次重试间隔
multiplier: 1 # 间隔倍数(1表示固定间隔)
max-attempts: 3 # 最大重试次数(含首次消费)
# 生产者确认机制(可靠性保障关键配置)
publisher-confirm-type: none # 开启消息确认
publisher-returns: correlated # 开启消息路由失败通知
(1)ConfirmCallback(消息到达交换机,Exchange)
作用: 确保消息 成功到达交换机(Exchange)。
如果消息 到达交换机,返回 ack=true
;如果 未到达交换机(如交换机不存在),返回 ack=false
并提供 cause
错误信息。
实现方式:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息成功到达交换机!");
} else {
System.out.println("消息未到达交换机,原因:" + cause);
}
});
(2)ReturnCallback(消息未投递到队列,Queue)
作用: 确保消息 正确路由到队列。
如果消息没有匹配任何队列(比如 routingKey
错误),触发 ReturnCallback
。
实现方式:
rabbitTemplate.setMandatory(true); // 必须设置为 true,否则不会触发回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("消息未投递到队列!");
System.out.println("交换机:" + exchange);
System.out.println("路由键:" + routingKey);
System.out.println("错误代码:" + replyCode + ",错误信息:" + replyText);
});
(3)事务模式(不推荐)
作用: 生产者使用事务 channel.txCommit()
进行消息投递,失败时回滚 channel.txRollback()
。但性能较低,不推荐使用。
以上三种方法都能确认生产者的消息成功送达,但会带来系统额外的性能开销,因此都不推荐使用。非要使用,RabbitMQ 官方建议使用 ConfirmCallback。
六、消费者确认机制
RabbitMQ 的消费者确认机制默认为none,这种模式下,RabbitMQ 不会要求消费者确认消息,消息会在消费者接收到时直接从队列中删除。 消费者确认机制(Consumer Acknowledgements)用于确保消息被正确消费,避免消息丢失或重复消费。主要有两种模式:
- 自动确认(autoAck = true)(不推荐,可能丢失消息)
- 手动确认(autoAck = false)(推荐,确保消息被正确处理)
1. 自动确认模式(Auto ACK)
默认情况下,RabbitMQ 采用自动确认,即消费者 收到消息后立即确认,无论业务逻辑是否执行成功,RabbitMQ 都会从队列中删除该消息。
代码示例:
@RabbitListener(queues = "testQueue")
public void receiveMessage(String message) {
System.out.println("收到消息:" + message);
int result = 1 / 0; // 业务代码出错
}
消息 刚到达消费者 就被 RabbitMQ 删除,即使消费者 还没处理,也不会重新投递,导致消息丢失。
2. 手动确认模式(Manual ACK)
手动确认允许消费者 成功处理消息后再手动确认,并可选择拒绝或重新入队,保证消息不会丢失。
代码示例:
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("收到消息:" + new String(message.getBody()));
// 业务处理逻辑
processMessage(new String(message.getBody()));
// 处理成功,手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
System.out.println("消息处理失败:" + e.getMessage());
// 处理失败,拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
}
}
手动确认的三种方式如下:
方法 | 作用 | 说明 |
---|---|---|
channel.basicAck(deliveryTag, false); |
确认消息 | 处理成功后,通知 RabbitMQ 删除消息 |
channel.basicNack(deliveryTag, false, true); |
拒绝消息,并重新入队 | 处理失败时,消息会重新回到队列 |
channel.basicReject(deliveryTag, false); |
拒绝消息,不重新入队 | 直接丢弃消息 |
七、消息持久化
在 RabbitMQ 中,消息持久化是为了确保消息在 RabbitMQ 服务器崩溃 或 重启 后,仍然能够被恢复。持久化可以保证即使 RabbitMQ 意外停止,消息不会丢失。
- 临时消息(non-persistent): 默认情况下,消息在内存中存储,RabbitMQ 重启后会丢失。
- 持久化消息(persistent): 消息存储在磁盘上,RabbitMQ 重启后可以恢复。
RabbitMQ 消息持久化涉及两个方面:
- 队列持久化
- 消息持久化
- 路由器持久化
1.设置队列和交换机持久化
在声明队列和交换机时,需要设置队列的 durable
属性为 true
,表示队列是持久化的。
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Queue dlxQueue() {
return new Queue("dlx.queue", true);
}
2.设置消息持久化
为了使消息持久化,需要将消息的 deliveryMode
设置为 2
(持久化)。如果是通过 Spring AMQP 发送消息,可以通过 MessagePostProcessor
来设置消息持久化。
通过 Spring AMQP 设置消息持久化:
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, messagePostProcessor);
}
八、延迟队列的实现
在 RabbitMQ 中实现 延迟队列(Delay Queue)通常有两种方式,分别是通过 插件 和 死信队列+TTL(Time-To-Live) 实现。延迟队列允许消息在特定的时间延迟后再被消费,通常用于实现任务调度、定时任务等功能。
(1)不用插件:
通过死信队列+TTL实现的。首先,我们将消息放到延迟队列中,将TTL设置为延迟时间,消息在TTL过期之后会被转发到死信交换机,再路由到死信队列,接着消费者到死信队列中消费消息,从而达到延迟消费的作用。
通过配置类创建死信交换机和死信队列,设置绑定和TTL
@Configuration
public class DlxConfig {
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
// 死信队列
@Bean
public Queue dlxQueue() {
return new Queue("dlx.queue", true);
}
// 死信队列绑定
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx");
}
// 延迟交换机和队列
@Bean
public DirectExchange delayExchange() {
return new DirectExchange("delay.exchange", true, false);
}
// 延迟队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.deadLetterExchange("dlx.exchange") // 绑定死信交换机
.deadLetterRoutingKey("dlx") // 设置死信路由键
.ttl(10 * 1000) // 设置消息TTL为10秒
.build();
}
// 延迟队列和交换机绑定
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");
}
}
测试类,发送消息到延迟队列:
@Test
public void test4() {
LocalTime now = LocalTime.now(); // 获取当前时间
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化
String msg = "hello, amqp";
// 通过交换机发送消息
rabbitTemplate.convertAndSend("delay.exchange", "delay.routing.key", msg);
System.out.println("发送时间为:" + now.format(formatter));
}
消费者,在死信队列中消费:
@RabbitListener(queues = "dlx.queue")
public void DelayedMessage(String msg) {
LocalTime now = LocalTime.now(); // 获取当前时间
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化
System.out.println("收到延迟消息时间为:" + now.format(formatter));
}
(2)使用插件:rabbitmq_delayed_message_exchange
消息发送到延迟交换机时,携带X-delay头(延迟时间),插件将消息暂存在内部数据库中,不会立即路由到队列,到期后将消息路由到目标队列;
直接在消费者的监视器注解上配置信息:
// 使用插件的 延迟队列声明
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.exchange", delayed = "true"),
key = "delay"
))
public void DelayMessage(String msg) {
LocalTime now = LocalTime.now(); // 获取当前时间
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化
System.out.println("收到延迟消息时间为:" + now.format(formatter));
}
测试类,设置TTL:
// 测试有延迟插件的 延迟队列
@Test
public void test5() {
LocalTime now = LocalTime.now(); // 获取当前时间
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化
String msg = "hello, amqp";
rabbitTemplate.convertAndSend("delay.exchange", "delay", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10 * 1000); // 设置过期时间
return message;
}
});
System.out.println("第一次发送时间为:" + now.format(formatter));
}
(3)使用插件和不使用插件的区别:
- 消息存储位置:没有插件的需要额外创建一个队列用来存储消息,而有插件的将消息暂存在Mnesia数据库中,占用资源较低;
- 性能和误差:没有插件的需要轮询检查消息TTL(默认是每秒检查一次),而有插件的使用 Erlang Timer 模块 或 时间轮算法管理延迟时间,性能比较好,延迟误差较小;
- 消息阻塞:没有插件的队列只会检查对头消息是否过期,会导致后续消息被阻塞;而有插件的对每个消息的延迟独立计算,到期后立即触发路由;
九、消息重复消费的解决
消息重复消费是一个常见的 RabbitMQ 消息队列问题,它可能会影响系统的准确性和性能。为了解决消息重复消费的问题,可以采取以下几种策略:
(1)开启消费者确认机制(手动ack或自动ack),确保每次消费者成功处理完消息后,发送一个信号给RabbitMQ, 如果消费者处理失败,重新排队未确认的消息;
@RabbitListener(queues = "order.payment.queue", ackMode = "MANUAL")
public void handlePaymentMessage(Message message, Channel channel) {
String orderId = new String(message.getBody());
try {
// 处理订单支付
orderService.processPayment(orderId);
// 确认消息已成功消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息,并不重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
(2)核心思想是保证幂等性:无论消息被消费多少次,结果与一次消费相同:
1.设置唯一标识(消息ID),将消息ID放到缓存中,每次消费前根据消息ID是否已存在来判定消息是否处理过;
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "order.payment.queue")
public void handlePaymentMessage(String orderId) {
String messageId = "payment:" + orderId;
if (redisTemplate.opsForSet().isMember("processedMessages", messageId)) {
return; // 如果消息已经处理过,跳过
}
// 处理订单支付
orderService.processPayment(orderId);
// 将消息ID存入 Redis,防止重复消费
redisTemplate.opsForSet().add("processedMessages", messageId);
}
2.利用数据库的唯一索引、主键的约束防止消息重复消费;
在消费者代码中,如果订单已存在(即订单ID已经在数据库中),则跳过该消息,从而避免重复消费:
@RabbitListener(queues = "order.payment.queue")
public void handlePaymentMessage(String orderId) {
try {
// 检查订单是否已支付(通过唯一索引判断是否重复消费)
if (orderService.isOrderPaid(orderId)) {
return; // 如果订单已经支付,则跳过该消息
}
// 处理订单支付
orderService.processPayment(orderId);
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
}
在 orderService.isOrderPaid(orderId)
方法中,我们可以查询数据库,检查订单是否已处理:
public boolean isOrderPaid(String orderId) {
// 通过唯一索引查询订单是否存在,如果存在则返回 true,表示已支付
return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM order_payment WHERE order_id = ?",
Integer.class, orderId) > 0;
}
3.使用乐观锁,通过版本号或条件判断是否重复消费;
十、异常消息的处理
异常消息的处理是消息队列中一个重要的问题,尤其是在 RabbitMQ 中。消息处理过程中,可能会遇到各种异常情况,例如消费者处理消息时发生错误、消息无法被消费等。为了提高系统的可靠性和容错能力,RabbitMQ 提供了多种机制来处理异常消息。
配置.yml文件或者.properties文件:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每个消费者最大未确认消息数(设为1实现能者多劳模式)
acknowledge-mode: auto # 自动ACK(高风险!建议改为 manual 手动确认,避免消息丢失)
retry:
enabled: true # 是否开启发送失败重试(仅针对网络波动场景,不解决业务异常)
multiplier: 1 # 重试间隔时间倍数(此处为固定间隔)
max-attempts: 3 # 最大重试次数(含首次发送)
initial-interval: 200ms # 首次重试间隔
创建异常交换机和异常队列:
/**
* 定义异常交换机和异常队列
* 将异常的消息重试多次失败后,统一放在这个错误队列中,人工处理
*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry"
, name = "enabled", havingValue = "true") // 当这个属性存在且为true,配置类才生效
public class ErrorConfig {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.exchange", true, false);
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding() {
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
}
// 消息异常重试n此后,移到该异常交换机
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}
总结
RabbitMQ 作为轻量级的消息队列,适用于中小型业务场景,具备高可用性、可靠性和灵活的消息路由机制。然而,它的吞吐量不及 Kafka,且运维成本较高。对于需要事务保障、低延迟、灵活消息分发的业务,RabbitMQ 是一个不错的选择。