RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在RabbitMQ成功收到消息后会返回确认消息给生产者
(1)RabbitMQ返回的结果情况
- 消息投递到了RabbitMQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功(造成原因:代码有问题或者交换机的配置有问题)
- 临时消息(非持久化)投递到了RabbitMQ,并且成功入队,返回ACK,告知投递成功
- 持久消息投递到了RabbitMQ,并且入队完成持久化,返回ACK,告知投递成功
- 其他情况都会返回NACK,告知投递失败
(2)生产者确认代码实现
添加配置文件信息
# Spring配置信息
spring:
# Rabbitmq配置
rabbitmq:
# 开启publisher confirm机制 none:关闭publisher confirm机制 simple:同步阻塞等待MQ的回执消息 correlated:MQ异步回调方式返回回执消息
publisher-confirm-type: correlated
# 开启publisher return机制
publisher-returns: true
编写Publisher Return回调函数(注:每个RabbitTemplate只能配置一个ReturnCallback)
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息返回:{},{},{},{},{}",
message, replyCode, replyText, exchange, routingKey);
});
}
}
编写ConfirmCallback(注:在每一个消息发送时候单独指定)
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Slf4j
@Component
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendMessage(Object message){
// 1.创建CorrelationData对象
CorrelationData correlationData = new CorrelationData(randomUUID().toString());
// 2.给Future添加ConfirmCallback
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// Future发生异常时的处理逻辑,基本不会触发
log.error("handle message ack fail",ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表收到ack,false代表收到nack
log.debug("发送消息成功,收到 ack!");
}else{// result.getReason(),String类型,返回nack时的异常原因
log.error("发送消息失败,收到 nack,reason : {}" , result.getReason());
}
}
});
// 发送消息
rabbitTemplate.convertAndSend("交换机名称", "routingKey值", message,correlationData);
}
}