在分布式系统中,消息中间件起到了非常重要的作用,常用于解耦和异步处理业务逻辑。然而,消息处理过程并不总是顺利,有时会出现消息消费失败的情况。为了确保消息不会丢失,我们需要对消息进行重试和处理死信消息。本文将介绍如何使用RocketMQ实现消息重试和处理死信消息的解决方案。
消息重试机制
消息重试是指当消息处理失败时,消息中间件会自动重新投递消息给消费者,直到消息处理成功或达到最大重试次数。默认情况下,RocketMQ会对失败的消息进行最多16次重试。
生产者端配置重试次数
在生产者端,可以设置发送失败时的重试次数:
public void retryProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("retry_producer_group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 设置同步发送失败时的重试次数
producer.setRetryTimesWhenSendFailed(2);
// 设置异步发送失败时的重试次数
producer.setRetryTimesWhenSendAsyncFailed(2);
String key = UUID.randomUUID().toString();
Message message = new Message("retryTopic", "vip1", key, "我是vip1000的文章".getBytes());
producer.send(message);
System.out.println("消息发送成功");
producer.shutdown();
}
消费者端配置重试次数
在消费者端,可以设置最大重试次数:
public void retryConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
// 设置最大重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
System.out.println("重试次数: " + messageExt.getReconsumeTimes());
System.out.println(new String(messageExt.getBody()));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.in.read();
}
死信消息处理
当消息重试达到最大次数仍然失败时,消息会被放入死信队列(Dead Letter Queue,DLQ)。在RocketMQ中,死信队列的主题名为:%DLQ% + 消费者组名
。
监听死信队列
可以创建一个消费者来监听死信队列,并将死信消息记录下来,以便进行人工干预处理:
public void retryDeadConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("%DLQ%retry-consumer-group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
System.out.println(new String(messageExt.getBody()));
System.out.println("记录到特别的位置,通知人工处理");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
综合方案
在实际应用中,我们可以结合上述两种方法,对消息重试和死信处理进行更细致的控制。例如,在消费者处理消息时,如果消息重试次数超过设定值,可以直接将其记录下来,避免再次重试:
public void retryDeadConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
MessageExt messageExt = list.get(0);
System.out.println(new Date());
try {
//业务代码
handleDb();
} catch (Exception e) {
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
System.out.println("记录到特别的位置,通知人工处理");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
通过以上方法,可以确保消息处理的可靠性,并且在发生异常时能够及时处理死信消息,从而保证系统的稳定性和数据的一致性。