文章目录
- 消息ttl过期成为死信
- 队列达到最大长度成为死信
- MyOrder.java
- RabbitMQDirectConfig.java
- OrderProducer.java
- PayConsumer.java
- DeadOrderConsumer.java
- application.yaml
死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的原因:
消息 TTL (Time To Live ) : x-message-ttl
队列达到最大长度(队列满了无法再添加数据到 mq 中) : x-max-length
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
消息ttl过期成为死信
map.put("x-message-ttl",2000); // 消息存活时间1s
rabbitmq中,设置了死信队列。a消息设置了ttl,a消息已经被消费,但消费者未给通知,a消息ttl过期后不会被送到死信队列
在RabbitMQ中
死信队列的概念:
RabbitMQ的死信队列是一种用于处理失败或无法路由的消息的机制。当消息处理失败、过期、被拒绝或无法路由时,这些消息可以被发送到死信队列。
消息的TTL(Time To Live):
TTL表示消息的过期时间。在RabbitMQ中,可以对消息设置TTL,意味着消息在一定时间内如果没有被消费,则会被认为是死信。但这里的关键是,TTL的判断通常发生在消息即将被投递给消费者之前。
消息被消费的情况:
如果a消息已经被消费,但消费者未给出确认通知(即未发送ack确认),那么这条消息在RabbitMQ内部的状态仍然是未确认的。然而,这并不影响消息的TTL判断。一旦消息被成功地从队列中取出并传递给消费者,TTL机制就不再对其起作用,因为此时消息已经处于消费者的控制之下。
死信队列与TTL的关联:
当消息的TTL过期时,如果这条消息还在队列中等待消费,那么它会被标记为死信并发送到死信队列(如果配置了死信队列的话)。但是,如果消息已经被消费,即使消费者没有发送确认通知,它也不会因为TTL过期而被送到死信队列。
综上所述:
a消息设置了TTL,并且已经被消费,但消费者未给出确认通知。
在这种情况下,即使a消息的TTL过期,它不会被送到死信队列。
原因是消息已经被消费,RabbitMQ认为该消息已经不在其控制之下,因此TTL机制不再适用。
a消息在已经被消费的情况下,不会因为TTL过期而被送到死信队列。
队列达到最大长度成为死信
MyOrder.java
package com.example.direct;
import java.io.Serializable;
public class MyOrder implements Serializable {
private String orderId;
private String orderNumber;
private String customerName;
private Integer productId;
private String productName;
private Float productPrice;
private Integer productCount;
private Float orderPrice;
public MyOrder(){}
public MyOrder(String orderId, String orderNumber, String customerName, Integer productId, String productName, Float productPrice, Integer productCount, Float orderPrice) {
this.orderId = orderId;
this.orderNumber = orderNumber;
this.customerName = customerName;
this.productId = productId;
this.productName = productName;
this.productPrice = productPrice;
this.productCount = productCount;
this.orderPrice = orderPrice;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderNumber() {
return orderNumber;
}
public Integer getProductId() {
return productId;
}
public void setProductId(Integer productId) {
this.productId = productId;
}
public void setOrderNumber(String orderNumber) {
this.orderNumber = orderNumber;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Float getProductPrice() {
return productPrice;
}
public void setProductPrice(Float productPrice) {
this.productPrice = productPrice;
}
public Integer getProductCount() {
return productCount;
}
public void setProductCount(Integer productCount) {
this.productCount = productCount;
}
public Float getOrderPrice() {
return orderPrice;
}
public void setOrderPrice(Float orderPrice) {
this.orderPrice = orderPrice;
}
@Override
public String toString() {
return "MyOrder{" +
"orderId=" + orderId +
", orderNumber='" + orderNumber + '\'' +
", customerName='" + customerName + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", productCount=" + productCount +
", orderPrice=" + orderPrice +
'}';
}
}
RabbitMQDirectConfig.java
package com.example.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQDirectConfig {
// 1. 创建交换机
// @Bean
// public DirectExchange newDirectExchange(){
// return new DirectExchange("myDirectExchangeAAA",true,false);
// }
//2. 创建队列
// @Bean
// public Queue newQueueA(){
// return new Queue("queueAAA",true);
// }
//3. 绑定队列到交换机中
// @Bean
// public Binding bindingA(){
// return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
// }
//==================死信
//1. 创建交换机
@Bean
public DirectExchange newExchange(){
return new DirectExchange("normalExchange",true,false);
}
//2. 创建队列
@Bean
public Queue newQueue(){
Map<String ,Object> map = new HashMap<>();
//map.put("x-message-ttl",2000); // 消息存活时间1s
map.put("x-max-length",6); // 队列达到最大长度 为6
map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称
map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字
return new Queue("normalQueueA",true,false,false,map);
}
//3. 绑定
@Bean
public Binding binding(){
return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");
}
//4. 创建死信交换机
@Bean
public DirectExchange newDeadExchange(){
return new DirectExchange("deadExchange",true,false);
}
//5. 创建死信队列
@Bean
public Queue newDeadQueue(){
return new Queue("deadQueueA",true,false,false);
}
//6. 绑定
@Bean
public Binding bindingDead(){
return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");
}
}
OrderProducer.java
package com.example.direct;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@RestController
@RequestMapping("a")
public class OrderProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("/submitOrder")
public String submitOrder(){
Map<String,Object> map = new HashMap<>();
map.put("orderNumber","2222");//String
map.put("productId",1111);//Integer
for(int i=0;i<=130;i++){
String orderId = UUID.randomUUID().toString().replace("-","");
map.put("orderId",orderId);
rabbitTemplate.convertAndSend("normalExchange", "key1", map);
}
return "生产者下单成功";
}
}
PayConsumer.java
package com.example.direct;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class PayConsumer {
@RabbitHandler
@RabbitListener(queues = "normalQueueA")
public void process(Map map, Channel channel, Message message) throws IOException {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("支付服务接收到的消息:" + map);
String orderId = (String)map.get("orderId");//String
Integer productId = (Integer)map.get("productId");//Integer
String orderNum = (String)map.get("orderNumber");//String
System.out.println("支付服务接收到的orderId:" + orderId);
System.out.println("支付服务接收到的productId:" + productId);
System.out.println("支付服务接收到的orderNum:" + orderNum);
//告诉broker,消息已经被确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
DeadOrderConsumer.java
package com.example.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class DeadOrderConsumer {
// 获得死信队列中的消息
@RabbitHandler
@RabbitListener(queues = "deadQueueA")
public void process(Map map){
System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);
String orderId = (String)map.get("orderId");//String
Integer productId = (Integer)map.get("productId");//Integer
String orderNum = (String)map.get("orderNumber");//String
System.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);
System.out.println("取消支付后,从死信队列中接收到的productId:" + productId);
System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);
}
}
application.yaml
server:
servlet:
context-path: /app
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated # 确认交换机已经接收到生产者的消息了
publisher-returns: true # 消息已经到了队列(交换机与队列绑定成功的)
listener:
simple:
acknowledge-mode: manual # 手动消息确认
concurrency: 1 #消费者数量
max-concurrency: 1 #消费者最大数量
prefetch: 1 #消费者每次从队列中取几个消息