RabbitMq生产者和消费者消息确认机制(ack)

时间:2025-04-06 21:48:00

RabbitMQ消息确认的本质也就是为了解决RabbitMQ消息丢失问题,因为哪怕我们做了RabbitMQ持久化,其实也并不能保证解决我们的消息丢失问题


RabbitMQ的消息确认有两种

  • 第一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
  • 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

1.消息发送确认(生产者)

正常情况下,生产者会通过交换机发送消息至队列中,再由消费者来进行消费,但是其实RabbitMQ在接收到消息后,还需要一段时间消息才能存入磁盘,并且其实也不是每条消息都会存入磁盘,可能仅仅只保存到cache中,这时,如果RabbitMQ正巧发生崩溃,消息则就会丢失,所以为了避免该情况的发生,我们引入了生产者确认机制,rabbitmq对此提供了两种方式:

方法一:Confirm模式

通过设置生产者Channel为comfirm模式,该Channel上发布的所有消息都会被指派一个唯一ID(每次从1开始累加),当消息到达生产者指定的消息队列后,broker会返回一个确认给生产者(包含之前的ID),这样生产者就能知道哪条消息成功发送了。

代码段:

    public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
        List<Object> list = new ArrayList<>();
        JSONObject jsonObject = new JSONObject();
        (, DELETE);
        (, list );
        String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
        String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
        //(topicExchange, routingKey, ());
        try {
            Channel channel = ().createConnection().createChannel(false);
            ();
            (topicExchange, routingKey, null, ().getBytes());
            (new ConfirmListener() {
                //消息失败处理
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    ("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
                    try {
                        (3000l);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    //重发
                    (topicExchange, routingKey, null, ().getBytes());
                }
                //消息成功处理
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    ("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
                }
            });
        } catch (Exception e) {
            ("sendQueue-ack-发送消息失败:{}", (e));
        }
    }

 方法二:手动确认,ConfirmCallback、returnCallback

代码段:


import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

/**
 * RabbitMq 生产者ACK
 */
@Slf4j
@Component
public class RabbitMqProducerAck implements  , {

    @Autowired
    private RabbitTemplate rabbitTemplatenew;


    /**
     * @param message
     */
    public void send(String topicName, String routingKey, String message){
        //设置由于网络问题导致的连接Rabbitmq失败的重试策略
        RetryTemplate retryTemplate = new RetryTemplate();
        (new SimpleRetryPolicy(3));
        //发送之前可以先把消息保存到数据库
        ("UTF-8");
        (true);
        (this);// 指定 ConfirmCallback
        (this);// 指定 ReturnCallback
        CorrelationData correlationData = new CorrelationData(().toString());
        ("rabbitMqProducerAck-confirm-sender==>{}----exchange:{}--routingkey:{}", (), topicName, routingKey, message);
        (topicName, routingKey, message, correlationData);
        try {
          (100);//线程休眠,为了不让方法直接结束,回调函数无法正常回调confirm方法
        } catch (InterruptedException e) {
         ();
        }finally {
         message=null;//强引用设置为null,便于gc回收
        }
    }

    /**
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        ("rabbitMqProducerAck-confirm-successs==>消息回调confirm函数:{},ack:{},cause:{}", (correlationData), ack, cause);
    }

    /**
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        ("rabbitMqProducerAck-confirm-fail==>消息使用的交换器 exchange : {}--消息使用的路由键 routing :{}--消息主体 message : {}-replyCode : {}-replyText: {}", exchange, routingKey, (),replyCode,replyText);
        try {
            (3000l);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        //从新发送
        (exchange, routingKey, new String(()));
    }
}

2.消息接收确认(消费者)

消息接收确认机制,分为消息自动确认模式和消息手动确认模式,当消息确认后,我们队列中的消息将会移除

那这两种模式要如何选择呢?

  • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便。好处就是可以提高吞吐量,缺点就是会丢失消息
  • 如果消息非常重要,不容丢失,则建议手动ACK,正常情况都是更建议使用手动ACK。虽然可以解决消息不会丢失的问题,但是可能会造成消费者过载

1):rabbitmq消费者默认情况下是自动确认,不再多说
2):手动确认方式:

    @RabbitHandler
    @RabbitListener(queues =  , concurrency = "1-1")
    public void receiveQueueCommonLocal(Channel channel, Message message) {
        String messageBody = new String(());
        //("messageBody===>"+messageBody);
        try {
            //todo 业务逻辑
            /*手动确认成功
             * 参数:
             * deliveryTag:该消息的index
             * multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
             * **/
            (().getDeliveryTag(), true);
        } catch (Exception e) {
            ();
            ("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", (e), messageBody);
            try {
                //手动确认回滚 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
                (().getDeliveryTag(), true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }

    }

相关文章