7.RabbitMQ可靠性投递
为了保证信息不丢失, 可靠抵达,引入确认机制
消息从生产者传递到消费者的过程中, 不同的阶段使用不同的确认方式.
7.0.准备请求
一次性发送10 个消息 通过 new.exchange.direct交换机
接收消息, 使用 new.admin路由键
向 new.admin队列
发送消息.
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sender/test2")
public String test2(){
String msg = "Mode Batch Confirm, Rabbit MQ";
for (int i = 0; i < 10; i++) {
String cd = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(cd);
// 交换机, 路由键, 消息, 消息id
rabbitTemplate.convertAndSend("new.exchange.direct","new.admin", msg +":" + i , correlationData);
}
return msg;
}
7.1.消息发送到borker消息代理
从 Producer 生产者到 borker消息代理时, 有两种方式: Transaction(事务)模式
和 Confirm(确认)模式
其中 Transaction(事务)模式
是使用阻塞模式, 效率低, 官方说法是性能下降 270 倍. 所以通常使用的是 Confirm(确认)模式
7.1.1.配置启动代理borker确认
# 启动 代理borker 确认
##spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-confirms=true 是旧版的写法
spring.rabbitmq.publisher-confirm-type
NONE : 禁用发布确认模式,是默认值
CORRELATED : 发布消息成功到交换器后会触发回调方法
SIMPLE : 经测试有两种效果,
其一效果和CORRELATED值一样会触发回调方法,
其二在发布消息成功后使用rabbitTemplate调用waitForConfirms()或waitForConfirmsOrDie()方法等待broker节点返回发送结果,
根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie()方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
7.1.2.增加 回调函数
建立配置类, 并为 RabbitTemplate增加回调函数 ConfirmCallback()
RabbitTemplate只允许设置一个callback方法,可以将RabbitTemplate的bean设为单例然后设置回调,但是这样有个缺点是使用RabbitTemplate的地方都会执行这个回调,如果直接在别的地方设置,会报如下错误
only one ConfirmCallback is supported by each RabbitTemplate
可以通过将RabbitTemplate的作用域设为@Scope,每次bean都是新的,来解决这个问题
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitMessageConfig {
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置代理borker接收确认回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 消息id
* @param ack 消息是否成功收到, 投递到代理borker 返回 true , 失败返回 false
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ack = " + ack);
if (!ack) {
System.out.println("发送消息到 [代理borker] 失败{ correlationData : " + correlationData + ", cause : " + cause + "}");
}else{
System.out.println("发送消息到 [代理borker] 成功{ correlationData : " + correlationData + "}");
}
}
});
return rabbitTemplate;
}
}
这样当 代理borker接收到消息时, 会自动调用方法
7.1.3.发请求测试
在 控制台输出
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fc39288a-86e3-4122-9c68-50d084e483f5]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=09d765f9-8317-4c87-8e7f-4e66ef382bca]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=0d9a414c-b5ae-4cf5-a825-6c14f4030643]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=55c0fe51-18d3-4d4c-b221-114341a69ad3]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=1d45b2e0-6bec-40d4-90ab-da52a6252d28]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fb4142a4-7f98-4517-bc74-b8ed0d048741]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=97ee4871-735d-4264-be62-bc7749804504]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=637e57a7-a440-45a9-bd90-87963e4108f9]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=e6260609-a118-4ad1-a864-a6777583826a]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=b9e2db4f-c259-42dd-81a9-3dbfc33755d7]}
rabbit控制台, 收到10条消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wtFbj8g-1679806873943)(RabbitMQ.assets/image-20230323104854513.png)]
7.2.消息发送到queue队列
第二个环节就是从交换机Exchange路由到队列queue。
消息无法路由到正确的队列的原因有 1)、路由键错误 2)、队列不存在。
有两种方式处理无法路由的消息,一种是让服务器重发给生产者,一种是让交换机路由到另一个备份的交换机。
7.2.1.配置启动队列queue确认
# 启动 队列queue 确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列, 以异步发送优先回调我们这个returnconfirm
spring.rabbitmq.template.mandatory=true
7.2.2.增加 回调函数
依然在 RabbitTemplate增加回调函数 ReturnCallback()
这个回调只有在队列接收失败时才会被调用
同时注意要 加上 rabbitTemplate.setMandatory(true); 的设置
//将消息退回给 producer 。并执行回调函数returnedMessage。
rabbitTemplate.setMandatory(true);
// 设置Queue队列接收确认回调函数
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
/**
* 这个方法在投递到队列queue [失败] 时才会执行
* @param message 投递失败的消息的详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 交换机信息
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){
System.out.print("发送消息到 [队列Queue] 失败, 回发的消息:{ ");
System.out.print("replyCode: "+replyCode);
System.out.print(", replyText: "+replyText);
System.out.print(", exchange: "+exchange);
System.out.print(", routingKey: "+routingKey);
System.out.println( " }");
}
});
7.2.3.测试
7.2.3.1.修改请求
加入一个不存在请求
@RequestMapping("/sender/test2")
public String test2(){
String msg = "Mode Batch Confirm, Rabbit MQ";
for (int i = 0; i < 10; i++) {
String cd = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(cd);
if(i%2==0){
// 交换机, 路由键, 消息, 消息id
rabbitTemplate.convertAndSend("new.exchange.direct","new.admin", msg +":" + i , correlationData);
}else{
// 写了一个不存在的路由键
rabbitTemplate.convertAndSend("new.exchange.direct","1234567890", msg +":" + i , correlationData);
}
}
return msg;
}
7.2.3.2.控制台输出
其中一部分, 消息成功发到broker代理, 但由于错误的路由键, 所以不能发到队列里
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=7310b457-a6b2-4c76-b28b-d854fcccaefd]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=06478d12-9e5d-432b-ad8e-e244e543b0ee]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=24c8fed0-617b-4a00-8dec-98d71dace4a2]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=bc565b4f-c98c-4412-ad49-7fce3b233521]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fd87a581-931b-44e7-9bbf-d3879e26da20]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=b45113af-1a04-4e2b-8d1b-b1bfd78e8404]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=3ef73907-f217-4676-aac8-cb3dfe30eea0]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=1e4ce655-2f4c-43f3-8f2e-c7d2d7d46b68]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=5ebb5f26-6cb0-4952-a00b-de218b1a144a]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=9a930da2-12a0-4911-b219-2c5ff9407fac]}
7.3.消费者接收消息
如果 消息不被消费会一直存储在MQ里 , 直到被消费
但自动消费模式下, 如果 多条消息只有一条被消费, 其它的消息也被从队列中清除, 所以要改为手动消费
7.3.1.配置启动手动消费
# 将 消息消费确认 修改为手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
7.3.2.增加消息消费手动确认
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiverServiceImpl {
// 监听队列
@RabbitListener(queues = "new.admin")
@RabbitHandler
public void goodsProcess(Message message, Channel channel) {
System.out.println("new.admin 队列 接收消息 : " + message);
// DeliveryTag 是 channel 内顺序号 , 自增形式
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag = " + deliveryTag);
try {
if (deliveryTag%2 == 0) {
// 确认消费, 第二个参数是 是否批量
channel.basicAck(deliveryTag, false);
System.out.println("签收了消息>>> = " + deliveryTag);
}else {
// long deliveryTag(序号), boolean multiple(是否批量签收), boolean requeue(是否重新入队)
// 重新入队的消息会 , 重新从队列投递给消费者
channel.basicNack(deliveryTag, false, true);
System.out.println("没有签收到消息<<< = " + deliveryTag);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里使用
deliveryTag%2==0
来 模拟 有的消息 确认消费, 有的消息不能确认消费通过 channel.basicNack(deliveryTag, false, true); 来处理 不能确认消费的消息
其中第三个参数, 代表是否重新进入队列, true为重新进入, 这样消息会重新发送到消费者这里