RabbitMQ从入门到实战-知识详情总结

时间:2025-03-14 07:10:37

一、简介

RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它用于异步通信、解耦系统,提高系统的可扩展性和可靠性。它广泛应用于微服务架构、分布式系统、异步处理等场景。


1.RabbitMQ 的特点

  1. 支持多种协议:支持 AMQP 0.9.1、AMQP 1.0、MQTT、STOMP 等。
  2. 高可用性:支持集群部署、镜像队列等机制,确保消息不丢失。
  3. 可靠性:通过持久化、ACK 机制、事务支持等方式保证消息可靠传输。
  4. 灵活的路由机制:通过交换机(Exchange)和绑定(Binding)策略,实现复杂的消息路由规则。
  5. 插件机制:提供插件扩展,如管理插件、消息追踪插件、WebSocket 支持等。
  6. 轻量级、易于部署:基于 Erlang 语言开发,占用资源较低,支持 Docker、Kubernetes 部署。

2.RabbitMQ 的优缺点

(1)优点:
  1. 支持多种消息模式(点对点、发布订阅、路由等)。
  2. 社区活跃,生态丰富,官方提供大量插件,支持多种语言客户端(Java、Python、Go 等)。
  3. 支持消息持久化,即使服务器重启,消息也不会丢失。
  4. 支持集群模式,可扩展性强,适用于分布式环境。
  5. 性能较好,适用于中小型业务场景。
(2)缺点:
  1. 吞吐量较低:相比 Kafka,RabbitMQ 的吞吐量较低,不适合超大规模数据流处理。
  2. 管理运维复杂:需要手动管理队列、交换机、绑定等,运维成本相对较高。
  3. Erlang 生态较小,部分企业的运维人员对 Erlang 语言不熟悉,可能影响问题排查。
  4. 资源占用较大:在高并发场景下,RabbitMQ 的内存占用较多,可能影响性能。

二、核心组件

1. 生产者(Producer)

  • 负责发送消息到 RabbitMQ,消息最终会进入队列(Queue)。
  • 生产者不会直接把消息发到队列,而是先发送到交换机(Exchange)

2. 交换机(Exchange)

  • 负责接收生产者发送的消息,并根据绑定规则决定消息流向。
  • 交换机类型:
    • Direct(直连交换机):消息按照指定的 routing key 发送到匹配的队列。
    • Fanout(扇形交换机):消息广播到所有绑定的队列,不考虑 routing key
    • Topic(主题交换机):按模式匹配 routing key,适用于模糊匹配的场景(如 logs.*)。
    • Headers(头交换机):根据消息的 Header 属性进行路由。

3. 队列(Queue)

  • 存储消息,等待消费者(Consumer)消费。
  • 可配置是否持久化(避免 RabbitMQ 重启后消息丢失)。
  • 可设置死信队列(DLX),当消息超时未消费或被拒绝时,进入死信队列做后续处理。

4. 绑定(Binding)

  • 连接交换机和队列的桥梁,决定消息如何流转。
  • 绑定通常通过 routing key 进行匹配,或基于 headers 进行匹配。

5. 消费者(Consumer)

  • 从队列中获取消息并进行处理。
  • 支持手动 ACK 机制(保证消息消费后才被确认,避免丢失)。

6. 虚拟主机(Virtual Host,VHost)

  • RabbitMQ 服务器的逻辑隔离机制,不同 VHost 之间的消息互不影响。
  • 每个 VHost 维护自己的队列、交换机和权限控制。

7. RabbitMQ 连接(Connection)和信道(Channel)

  • Connection:TCP 连接,生产者和消费者都需要建立连接。
  • Channel:RabbitMQ 的逻辑连接,多个 Channel 共享一个 Connection,减少 TCP 连接开销。

三、交换机类型

RabbitMQ 中的交换机(Exchange)是消息路由的核心,决定了消息的流向。根据不同的路由策略,RabbitMQ 提供了几种交换机类型:

1. Direct Exchange(直连交换机)

  • 特点:消息通过交换机按 routing key 路由到匹配的队列。
  • 适用场景:精确匹配的消息路由。

在 Direct Exchange 中,生产者发送消息时指定 routing key,只有与队列绑定时的 routing key 完全匹配的队列会接收到消息。


2. Fanout Exchange(扇形交换机)

  • 特点:不关心 routing key,将消息广播到所有绑定的队列。
  • 适用场景:需要将消息发送给多个消费者或多个队列的场景(例如广播通知)。

Fanout Exchange 将消息发送到所有绑定的队列,完全不考虑 routing key


3. Topic Exchange(主题交换机)

  • 特点:使用 routing key 的模式匹配功能,支持通配符(*#)来路由消息。
  • 适用场景:根据模式灵活路由消息,比如日志系统中的级别过滤(如 logs.infologs.error)。

Topic Exchange 根据绑定的 routing key 模式进行匹配,* 匹配一个词,# 匹配多个词。比如 logs.info 匹配所有包含 logs.info 的消息。


4. Headers Exchange(头交换机)

  • 特点:通过匹配消息的 Header 信息进行路由,而不是 routing key
  • 适用场景:需要通过多个属性来过滤消息的情况,比如消息头包含多个属性,灵活的路由机制。

Headers Exchange 根据消息的 header 信息来路由,常用于需要灵活多维度匹配的场景。


总结

  • Direct Exchange:用于精确匹配的场景。
  • Fanout Exchange:用于广播场景,所有绑定的队列都会接收到消息。
  • Topic Exchange:用于模式匹配,灵活路由。
  • Headers Exchange:通过 header 属性进行路由,灵活多维度匹配。

四、创建方式

在 RabbitMQ 中,队列(Queue)、交换机(Exchange)、和绑定(Binding)是消息传递的核心元素。下面我会介绍它们在 Spring Boot 中的创建方式,包括使用 配置类注解方式

1.配置类方式

在配置类中,可以通过 @Bean 注解来声明队列、交换机和绑定关系。

@Configuration
public class RabbitConfig {

    // 创建队列
    @Bean
    public Queue queue() {
        return new Queue("myQueue", true);
    }

    // 创建 Direct 交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }

    // 创建绑定
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("myRoutingKey");
    }
}
  • 队列(Queue):可以使用 @Bean 在配置类中声明。
  • 交换机(Exchange):同样使用 @Bean 创建,通常使用 DirectExchangeFanoutExchangeTopicExchange 等不同类型。
  • 绑定(Binding):队列和交换机的绑定通过 BindingBuilder.bind(queue).to(exchange).with(routingKey) 来配置。

2.注解方式

在 Spring Boot 中,@RabbitListener 注解可以用来监听 RabbitMQ 队列,并且可以通过 @QueueBinding 直接在注解中完成 队列、交换机、绑定 的声明(如果不存在就会创建)。

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "coupon.order.queue1", durable = "true"), // 队列
        exchange = @Exchange(name = "coupon.order.exchange", type = ExchangeTypes.DIRECT, durable = "true"), // 交换机
        key = {"coupon"} // 绑定的 routing key
))
public void receiveMessage(String message) {
    System.out.println("Received: " + message);
}

注意:RabbitMQ 默认发送和接收的消息是 字节数组(byte[]),在 Spring Boot 中,默认是字符串,但如果发送的是对象,需要手动配置 JSON 序列化 才能正确转换。需要在生产者和消费者都添加Bean注解:

@Bean
    public MessageConverter jacksonMessageConvertor() {
        return new Jackson2JsonMessageConverter();
    }

五、生产者确认机制

RabbitMQ 生产者确认机制(Publisher Confirms)用于确保消息从生产者 正确投递到交换机,并且 从交换机正确路由到队列,防止消息丢失。


1. 生产者确认机制的三种模式

首先配置.yml文件或.properties文件:

spring:
  rabbitmq:
    # 生产者网络连接失败重试设置,不建议使用
    template:
      retry:
        enabled: true        # 开启消费者重试(抛出异常时触发)
        initial-interval: 400ms  # 首次重试间隔
        multiplier: 1        # 间隔倍数(1表示固定间隔)
        max-attempts: 3      # 最大重试次数(含首次消费)


        # 生产者确认机制(可靠性保障关键配置)
        publisher-confirm-type: none  # 开启消息确认
        publisher-returns: correlated     # 开启消息路由失败通知

(1)ConfirmCallback(消息到达交换机,Exchange)

作用: 确保消息 成功到达交换机(Exchange)。

如果消息 到达交换机,返回 ack=true;如果 未到达交换机(如交换机不存在),返回 ack=false 并提供 cause 错误信息。

实现方式:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        System.out.println("消息成功到达交换机!");
    } else {
        System.out.println("消息未到达交换机,原因:" + cause);
    }
});

(2)ReturnCallback(消息未投递到队列,Queue)

作用: 确保消息 正确路由到队列。

如果消息没有匹配任何队列(比如 routingKey 错误),触发 ReturnCallback

实现方式:

rabbitTemplate.setMandatory(true); // 必须设置为 true,否则不会触发回调

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    System.out.println("消息未投递到队列!");
    System.out.println("交换机:" + exchange);
    System.out.println("路由键:" + routingKey);
    System.out.println("错误代码:" + replyCode + ",错误信息:" + replyText);
});

(3)事务模式(不推荐)

作用: 生产者使用事务 channel.txCommit() 进行消息投递,失败时回滚 channel.txRollback()。但性能较低,不推荐使用。

 以上三种方法都能确认生产者的消息成功送达,但会带来系统额外的性能开销,因此都不推荐使用。非要使用,RabbitMQ 官方建议使用 ConfirmCallback。 

六、消费者确认机制

RabbitMQ 的消费者确认机制默认为none,这种模式下,RabbitMQ 不会要求消费者确认消息,消息会在消费者接收到时直接从队列中删除。 消费者确认机制(Consumer Acknowledgements)用于确保消息被正确消费,避免消息丢失或重复消费。主要有两种模式:

  1. 自动确认(autoAck = true)(不推荐,可能丢失消息)
  2. 手动确认(autoAck = false)(推荐,确保消息被正确处理)

1. 自动确认模式(Auto ACK)

默认情况下,RabbitMQ 采用自动确认,即消费者 收到消息后立即确认,无论业务逻辑是否执行成功,RabbitMQ 都会从队列中删除该消息。

代码示例:
@RabbitListener(queues = "testQueue")
public void receiveMessage(String message) {
    System.out.println("收到消息:" + message);
    int result = 1 / 0;  // 业务代码出错
}

消息 刚到达消费者 就被 RabbitMQ 删除,即使消费者 还没处理,也不会重新投递,导致消息丢失

2. 手动确认模式(Manual ACK)

手动确认允许消费者 成功处理消息后再手动确认,并可选择拒绝或重新入队,保证消息不会丢失。

代码示例:
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        System.out.println("收到消息:" + new String(message.getBody()));

        // 业务处理逻辑
        processMessage(new String(message.getBody()));

        // 处理成功,手动确认
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        System.out.println("消息处理失败:" + e.getMessage());

        // 处理失败,拒绝消息并重新入队
        channel.basicNack(deliveryTag, false, true);
    }
}

手动确认的三种方式如下:

方法 作用 说明
channel.basicAck(deliveryTag, false); 确认消息 处理成功后,通知 RabbitMQ 删除消息
channel.basicNack(deliveryTag, false, true); 拒绝消息,并重新入队 处理失败时,消息会重新回到队列
channel.basicReject(deliveryTag, false); 拒绝消息,不重新入队 直接丢弃消息

七、消息持久化

在 RabbitMQ 中,消息持久化是为了确保消息在 RabbitMQ 服务器崩溃重启 后,仍然能够被恢复。持久化可以保证即使 RabbitMQ 意外停止,消息不会丢失。

  • 临时消息(non-persistent): 默认情况下,消息在内存中存储,RabbitMQ 重启后会丢失。
  • 持久化消息(persistent): 消息存储在磁盘上,RabbitMQ 重启后可以恢复。

RabbitMQ 消息持久化涉及两个方面:

  1. 队列持久化
  2. 消息持久化
  3. 路由器持久化

1.设置队列和交换机持久化 

在声明队列和交换机时,需要设置队列的 durable 属性为 true,表示队列是持久化的。

@Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }

    @Bean
    public Queue dlxQueue() {
        return new Queue("dlx.queue", true);
    }

2.设置消息持久化

为了使消息持久化,需要将消息的 deliveryMode 设置为 2(持久化)。如果是通过 Spring AMQP 发送消息,可以通过 MessagePostProcessor 来设置消息持久化。

通过 Spring AMQP 设置消息持久化:
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMessage(String message) {
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 设置消息持久化
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }
    };
    
    rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, messagePostProcessor);
}

八、延迟队列的实现

RabbitMQ 中实现 延迟队列(Delay Queue)通常有两种方式,分别是通过 插件 和 死信队列+TTL(Time-To-Live) 实现。延迟队列允许消息在特定的时间延迟后再被消费,通常用于实现任务调度、定时任务等功能。

(1)不用插件:

通过死信队列+TTL实现的。首先,我们将消息放到延迟队列中,将TTL设置为延迟时间,消息在TTL过期之后会被转发到死信交换机,再路由到死信队列,接着消费者到死信队列中消费消息,从而达到延迟消费的作用。

通过配置类创建死信交换机和死信队列,设置绑定和TTL 

@Configuration
public class DlxConfig {

    // 死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }

    // 死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue("dlx.queue", true);
    }

    // 死信队列绑定
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx");
    }

    // 延迟交换机和队列
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange("delay.exchange", true, false);
    }

    // 延迟队列
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable("delay.queue")
                .deadLetterExchange("dlx.exchange")  // 绑定死信交换机
                .deadLetterRoutingKey("dlx")         // 设置死信路由键
                .ttl(10 * 1000)                      // 设置消息TTL为10秒
                .build();
    }

    // 延迟队列和交换机绑定
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");
    }
}

 测试类,发送消息到延迟队列:

@Test
public void test4() {
    LocalTime now = LocalTime.now(); // 获取当前时间
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化

    String msg = "hello, amqp";
    // 通过交换机发送消息
    rabbitTemplate.convertAndSend("delay.exchange", "delay.routing.key", msg);
    System.out.println("发送时间为:" + now.format(formatter));
}

 消费者,在死信队列中消费:

@RabbitListener(queues = "dlx.queue")
public void DelayedMessage(String msg) {
    LocalTime now = LocalTime.now(); // 获取当前时间
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化
    System.out.println("收到延迟消息时间为:" + now.format(formatter));
}

(2)使用插件:rabbitmq_delayed_message_exchange

消息发送到延迟交换机时,携带X-delay头(延迟时间),插件将消息暂存在内部数据库中,不会立即路由到队列,到期后将消息路由到目标队列; 

直接在消费者的监视器注解上配置信息:

// 使用插件的 延迟队列声明
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.exchange", delayed = "true"),
            key = "delay"
    ))
    public void DelayMessage(String msg) {
        LocalTime now = LocalTime.now(); // 获取当前时间
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化
        System.out.println("收到延迟消息时间为:" + now.format(formatter));
    }

 测试类,设置TTL:

// 测试有延迟插件的 延迟队列
    @Test
    public void test5() {
        LocalTime now = LocalTime.now(); // 获取当前时间
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化

        String msg = "hello, amqp";
        rabbitTemplate.convertAndSend("delay.exchange", "delay", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10 * 1000);   // 设置过期时间
                return message;
            }
        });
        System.out.println("第一次发送时间为:" + now.format(formatter));
    }

(3)使用插件和不使用插件的区别: 

  1. 消息存储位置:没有插件的需要额外创建一个队列用来存储消息,而有插件的将消息暂存在Mnesia数据库中,占用资源较低;
  2. 性能和误差:没有插件的需要轮询检查消息TTL(默认是每秒检查一次),而有插件的使用 Erlang Timer 模块 或 时间轮算法管理延迟时间,性能比较好,延迟误差较小;
  3. 消息阻塞:没有插件的队列只会检查对头消息是否过期,会导致后续消息被阻塞;而有插件的对每个消息的延迟独立计算,到期后立即触发路由;

九、消息重复消费的解决

消息重复消费是一个常见的 RabbitMQ 消息队列问题,它可能会影响系统的准确性和性能。为了解决消息重复消费的问题,可以采取以下几种策略:

1)开启消费者确认机制(手动ack或自动ack确保每次消费者成功处理完消息后,发送一个信号给RabbitMQ, 如果消费者处理失败,重新排队未确认的消息;

@RabbitListener(queues = "order.payment.queue", ackMode = "MANUAL")
public void handlePaymentMessage(Message message, Channel channel) {
    String orderId = new String(message.getBody());

    try {
        // 处理订单支付
        orderService.processPayment(orderId);
        
        // 确认消息已成功消费
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理失败,拒绝消息,并不重新入队
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}

(2)核心思想是保证幂等性:无论消息被消费多少次,结果与一次消费相同:

1.设置唯一标识(消息ID),将消息ID放到缓存中,每次消费前根据消息ID是否已存在来判定消息是否处理过;

@Autowired
private StringRedisTemplate redisTemplate;

@RabbitListener(queues = "order.payment.queue")
public void handlePaymentMessage(String orderId) {
    String messageId = "payment:" + orderId;
    if (redisTemplate.opsForSet().isMember("processedMessages", messageId)) {
        return;  // 如果消息已经处理过,跳过
    }

    // 处理订单支付
    orderService.processPayment(orderId);

    // 将消息ID存入 Redis,防止重复消费
    redisTemplate.opsForSet().add("processedMessages", messageId);
}

2.利用数据库的唯一索引、主键的约束防止消息重复消费; 

    在消费者代码中,如果订单已存在(即订单ID已经在数据库中),则跳过该消息,从而避免重复消费:

    @RabbitListener(queues = "order.payment.queue")
    public void handlePaymentMessage(String orderId) {
        try {
            // 检查订单是否已支付(通过唯一索引判断是否重复消费)
            if (orderService.isOrderPaid(orderId)) {
                return;  // 如果订单已经支付,则跳过该消息
            }
    
            // 处理订单支付
            orderService.processPayment(orderId);
        } catch (Exception e) {
            // 处理异常
            e.printStackTrace();
        }
    }
    

    orderService.isOrderPaid(orderId) 方法中,我们可以查询数据库,检查订单是否已处理:

    public boolean isOrderPaid(String orderId) {
        // 通过唯一索引查询订单是否存在,如果存在则返回 true,表示已支付
        return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM order_payment WHERE order_id = ?", 
                                           Integer.class, orderId) > 0;
    }
    

    3.使用乐观锁,通过版本号或条件判断是否重复消费;

    十、异常消息的处理

    异常消息的处理是消息队列中一个重要的问题,尤其是在 RabbitMQ 中。消息处理过程中,可能会遇到各种异常情况,例如消费者处理消息时发生错误、消息无法被消费等。为了提高系统的可靠性和容错能力,RabbitMQ 提供了多种机制来处理异常消息。

    配置.yml文件或者.properties文件:

    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1  # 每个消费者最大未确认消息数(设为1实现能者多劳模式)
            acknowledge-mode: auto  # 自动ACK(高风险!建议改为 manual 手动确认,避免消息丢失)
    
            retry:
              enabled: true          # 是否开启发送失败重试(仅针对网络波动场景,不解决业务异常)
              multiplier: 1          # 重试间隔时间倍数(此处为固定间隔)
              max-attempts: 3        # 最大重试次数(含首次发送)
              initial-interval: 200ms # 首次重试间隔

    创建异常交换机和异常队列:

    /**
     * 定义异常交换机和异常队列
     * 将异常的消息重试多次失败后,统一放在这个错误队列中,人工处理
     */
    
    @Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry"
            , name = "enabled", havingValue = "true")   // 当这个属性存在且为true,配置类才生效
    public class ErrorConfig {
    
        @Bean
        public DirectExchange errorExchange() {
            return new DirectExchange("error.exchange", true, false);
        }
    
        @Bean
        public Queue errorQueue() {
            return new Queue("error.queue", true);
        }
    
        @Bean
        public Binding errorBinding() {
            return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
        }
    
        // 消息异常重试n此后,移到该异常交换机
        @Bean
        public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
            return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
        }
    }

     

    总结

    RabbitMQ 作为轻量级的消息队列,适用于中小型业务场景,具备高可用性、可靠性和灵活的消息路由机制。然而,它的吞吐量不及 Kafka,且运维成本较高。对于需要事务保障、低延迟、灵活消息分发的业务,RabbitMQ 是一个不错的选择。