RabbitMQ学习笔记六:RabbitMQ之消息确认

时间:2021-07-12 14:38:41

使用消息队列,必须要考虑的问题就是生产者消息发送失败和消费者消息处理失败,这两种情况怎么处理.

生产者发送消息,成功,则确认消息发送成功;失败,则返回消息发送失败信息,再做处理.

消费者处理消息,成功,则消息队列自动删除消息;失败,则消息重新返回队列,等待处理.

对于消费者处理失败的情况,如果仅仅只是让消息重新返回队列,等待处理,那么久有可能会出现很多消息一直无法处理的情况;因此,是否让消息返回队列,还有待商榷.

现在,我一步步来分析RabbitMQ的消息确认(这次的代码同样是在上次的代码基础上做修改,修改后的代码会上传到百度云,后面会有链接地址):

生产者端,修改spring-config.xml,rabbitTemplate增加消息确认和返回错误信息的监听器:

    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm
-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory
="true"
/>

消息确认监听器confirmCallBackListener:

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback {

@Override
public void confirm(CorrelationData arg0, boolean arg1)
{
System.out.println(
"确认消息完成..."); // 只确生产者消息发送成功,消费者是否处理成功不做保证
}
}

消息发送失败返回监听器returnCallBackListener:

@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback {

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
{
System.out.println(
"消息返回处理中...");
}
}

消费者端,修改spring-config.xml,设置ack方式为手动,增加对应队列的监听器。

<!-- ========================================RabbitMQ========================================= -->
<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
<!-- 监听器 设置acknowledge="manual" 则开启ack机制 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
<rabbit:listener queues="test" ref="receiveConfirmTestListener" />
</rabbit:listener-container>
<!-- 队列声明 -->
<rabbit:queue name="test" durable="true" />
<!-- 测试监听处理器 -->
<bean id="receiveConfirmTestListener" class="com.aitongyi.customer.ReceiveConfirmTestListener" />

配置的receiveConfirmTestListener没有指定方法,是因为它实现了接口ChannelAwareMessageListener,代码如下:

public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception
{
try
{
System.out.println(
"consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));

// deliveryTag是消息传送的次数,我这里是为了让消息队列的第一个消息到达的时候抛出异常,处理异常让消息重新回到队列,然后再次抛出异常,处理异常拒绝让消息重回队列
if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2)
{
throw new Exception();
}

channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false); // false只确认当前一个消息收到,true确认所有consumer获得的消息
}
catch (Exception e)
{
e.printStackTrace();

if (message.getMessageProperties().getRedelivered())
{
System.out.println(
"消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),
true); // 拒绝消息
}
else
{
System.out.println(
"消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false, true); // requeue为是否重新回到队列
}
}
}
}

启动项目,发送消息,查看测试结果:

打开发送消息页面,点击发送:

RabbitMQ学习笔记六:RabbitMQ之消息确认

在控制台查看结果(抛出的异常信息我没有粘贴出来):

2017-05-17 14:56:21 532 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待发送消息: type[test]-msg[hello world test rabbit!]
2017-05-17 14:56:21 819 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--发送消息完成: routingKey[test]-msg[hello world test rabbit!]
consumer
--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0]:hello world test rabbit!
消息即将再次返回队列处理...
consumer
--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=test, deliveryTag=2, messageCount=0]:hello world test rabbit!
确认消息完成...
消息已重复处理失败,拒绝再次接收...
consumer
--:MessageProperties [headers={spring_return_correlation=776f57cd-1e22-44de-bc11-eb82bc043562}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=test, deliveryTag=3, messageCount=0]:hello world test rabbit!

上面代码的处理方式是消费者接收到消息后处理消息,第一次处理失败,让消息重回队列,如果重回队列后仍然失败,则拒绝接收消息。这个做法只是一个参考,具体如何做,要根据业务来做改变。

消息发送失败情形展示:将发送的消息队列routingKey,也就是配置的队列名改成一个在RabbitMQ Server不存在的。然后打开发送消息页面,点击发送,结果如下:

2017-05-17 15:12:54 437 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待发送消息: type[test]-msg[hello world test rabbit!]
2017-05-17 15:12:54 439 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--发送消息完成: routingKey[test]-msg[hello world test rabbit!]
消息返回处理中...
确认消息完成...

至于为什么还是会打印"确认消息完成...",有兴趣的可以看下源码。

源代码已上传至百度云网盘,欢迎下载阅读,地址:http://pan.baidu.com/s/1eSL2w8M