RocketMQ消息重试和死信消息解决方案

时间:2024-07-17 09:21:45

在分布式系统中,消息中间件起到了非常重要的作用,常用于解耦和异步处理业务逻辑。然而,消息处理过程并不总是顺利,有时会出现消息消费失败的情况。为了确保消息不会丢失,我们需要对消息进行重试和处理死信消息。本文将介绍如何使用RocketMQ实现消息重试和处理死信消息的解决方案。

消息重试机制

消息重试是指当消息处理失败时,消息中间件会自动重新投递消息给消费者,直到消息处理成功或达到最大重试次数。默认情况下,RocketMQ会对失败的消息进行最多16次重试。

生产者端配置重试次数

在生产者端,可以设置发送失败时的重试次数:

public void retryProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("retry_producer_group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    // 设置同步发送失败时的重试次数
    producer.setRetryTimesWhenSendFailed(2);
    // 设置异步发送失败时的重试次数
    producer.setRetryTimesWhenSendAsyncFailed(2);
    String key = UUID.randomUUID().toString();
    Message message = new Message("retryTopic", "vip1", key, "我是vip1000的文章".getBytes());
    producer.send(message);
    System.out.println("消息发送成功");
    producer.shutdown();
}
消费者端配置重试次数

在消费者端,可以设置最大重试次数:

public void retryConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    // 设置最大重试次数
    consumer.setMaxReconsumeTimes(2);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = list.get(0);
            System.out.println(new Date());
            System.out.println("重试次数: " + messageExt.getReconsumeTimes());
            System.out.println(new String(messageExt.getBody()));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

死信消息处理

当消息重试达到最大次数仍然失败时,消息会被放入死信队列(Dead Letter Queue,DLQ)。在RocketMQ中,死信队列的主题名为:%DLQ% + 消费者组名

监听死信队列

可以创建一个消费者来监听死信队列,并将死信消息记录下来,以便进行人工干预处理:

public void retryDeadConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("%DLQ%retry-consumer-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = list.get(0);
            System.out.println(new Date());
            System.out.println(new String(messageExt.getBody()));
            System.out.println("记录到特别的位置,通知人工处理");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

综合方案

在实际应用中,我们可以结合上述两种方法,对消息重试和死信处理进行更细致的控制。例如,在消费者处理消息时,如果消息重试次数超过设定值,可以直接将其记录下来,避免再次重试:

public void retryDeadConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    consumer.setMaxReconsumeTimes(2);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = list.get(0);
            System.out.println(new Date());
            try {
            	//业务代码
                handleDb();
            } catch (Exception e) {
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    System.out.println("记录到特别的位置,通知人工处理");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

通过以上方法,可以确保消息处理的可靠性,并且在发生异常时能够及时处理死信消息,从而保证系统的稳定性和数据的一致性。