004 死信(限制队列最大长度)

时间:2024-04-29 07:31:57

文章目录

    • 消息ttl过期成为死信
    • 队列达到最大长度成为死信
      • MyOrder.java
      • RabbitMQDirectConfig.java
      • OrderProducer.java
      • PayConsumer.java
      • DeadOrderConsumer.java
    • application.yaml

死信就是无法被消费的消息。一般来说,producer 将消息投递到 broker 或者直接到 queue 中,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。 还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

消息 TTL (Time To Live ) : x-message-ttl
队列达到最大长度(队列满了无法再添加数据到 mq 中) : x-max-length
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

消息ttl过期成为死信

map.put("x-message-ttl",2000); // 消息存活时间1s

rabbitmq中,设置了死信队列。a消息设置了ttl,a消息已经被消费,但消费者未给通知,a消息ttl过期后不会被送到死信队列

在RabbitMQ中
死信队列的概念:
RabbitMQ的死信队列是一种用于处理失败或无法路由的消息的机制。当消息处理失败、过期、被拒绝或无法路由时,这些消息可以被发送到死信队列。

消息的TTL(Time To Live):
TTL表示消息的过期时间。在RabbitMQ中,可以对消息设置TTL,意味着消息在一定时间内如果没有被消费,则会被认为是死信。但这里的关键是,TTL的判断通常发生在消息即将被投递给消费者之前。

消息被消费的情况:
如果a消息已经被消费,但消费者未给出确认通知(即未发送ack确认),那么这条消息在RabbitMQ内部的状态仍然是未确认的。然而,这并不影响消息的TTL判断。一旦消息被成功地从队列中取出并传递给消费者,TTL机制就不再对其起作用,因为此时消息已经处于消费者的控制之下。

死信队列与TTL的关联:
当消息的TTL过期时,如果这条消息还在队列中等待消费,那么它会被标记为死信并发送到死信队列(如果配置了死信队列的话)。但是,如果消息已经被消费,即使消费者没有发送确认通知,它也不会因为TTL过期而被送到死信队列。

综上所述:

a消息设置了TTL,并且已经被消费,但消费者未给出确认通知。
在这种情况下,即使a消息的TTL过期,它不会被送到死信队列。
原因是消息已经被消费,RabbitMQ认为该消息已经不在其控制之下,因此TTL机制不再适用。

a消息在已经被消费的情况下,不会因为TTL过期而被送到死信队列。

队列达到最大长度成为死信

MyOrder.java


package com.example.direct;

import java.io.Serializable;

public class MyOrder implements Serializable {
    private String orderId;
    private String orderNumber;
    private String customerName;
    private Integer productId;
    private String productName;
    private Float productPrice;
    private Integer productCount;
    private Float orderPrice;

    public MyOrder(){}


    public MyOrder(String orderId, String orderNumber, String customerName, Integer productId, String productName, Float productPrice, Integer productCount, Float orderPrice) {
        this.orderId = orderId;
        this.orderNumber = orderNumber;
        this.customerName = customerName;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.productCount = productCount;
        this.orderPrice = orderPrice;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getOrderNumber() {
        return orderNumber;
    }

    public Integer getProductId() {
        return productId;
    }

    public void setProductId(Integer productId) {
        this.productId = productId;
    }

    public void setOrderNumber(String orderNumber) {
        this.orderNumber = orderNumber;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public Float getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(Float productPrice) {
        this.productPrice = productPrice;
    }

    public Integer getProductCount() {
        return productCount;
    }

    public void setProductCount(Integer productCount) {
        this.productCount = productCount;
    }

    public Float getOrderPrice() {
        return orderPrice;
    }

    public void setOrderPrice(Float orderPrice) {
        this.orderPrice = orderPrice;
    }

    @Override
    public String toString() {
        return "MyOrder{" +
                "orderId=" + orderId +
                ", orderNumber='" + orderNumber + '\'' +
                ", customerName='" + customerName + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", productCount=" + productCount +
                ", orderPrice=" + orderPrice +
                '}';
    }
}


RabbitMQDirectConfig.java


package com.example.direct;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQDirectConfig  {

//    1. 创建交换机
//    @Bean
//    public DirectExchange newDirectExchange(){
//        return new DirectExchange("myDirectExchangeAAA",true,false);
//    }

    //2. 创建队列
//    @Bean
//    public Queue newQueueA(){
//        return new Queue("queueAAA",true);
//    }

//3. 绑定队列到交换机中
//    @Bean
//    public Binding bindingA(){
//        return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
//    }








    //==================死信


    //1. 创建交换机
    @Bean
    public DirectExchange newExchange(){
        return new DirectExchange("normalExchange",true,false);
    }
    //2. 创建队列
    @Bean
    public Queue newQueue(){
        Map<String ,Object> map = new HashMap<>();
        //map.put("x-message-ttl",2000); // 消息存活时间1s
        map.put("x-max-length",6); // 队列达到最大长度 为6
        map.put("x-dead-letter-exchange","deadExchange");// 设置死信交换机 的名称
        map.put("x-dead-letter-routing-key","key2") ;//设置死信路由键名字
        return new Queue("normalQueueA",true,false,false,map);

    }
    //3. 绑定
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(newQueue()).to(newExchange()).with("key1");
    }


    //4. 创建死信交换机
    @Bean
    public DirectExchange newDeadExchange(){
        return new DirectExchange("deadExchange",true,false);
    }
    //5. 创建死信队列
    @Bean
    public Queue newDeadQueue(){

        return new Queue("deadQueueA",true,false,false);
    }
    //6. 绑定
    @Bean
    public Binding bindingDead(){
        return BindingBuilder.bind(newDeadQueue()).to(newDeadExchange()).with("key2");
    }








}



OrderProducer.java


package com.example.direct;


import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

@RestController
@RequestMapping("a")
public class OrderProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/submitOrder")
    public String submitOrder(){

        Map<String,Object> map = new HashMap<>();
        map.put("orderNumber","2222");//String
        map.put("productId",1111);//Integer


        for(int i=0;i<=130;i++){

            String orderId = UUID.randomUUID().toString().replace("-","");


            map.put("orderId",orderId);


            rabbitTemplate.convertAndSend("normalExchange", "key1", map);
        }


        return "生产者下单成功";

    }


}



PayConsumer.java



package com.example.direct;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class PayConsumer {
    @RabbitHandler
    @RabbitListener(queues = "normalQueueA")
    public void process(Map map, Channel channel, Message message) throws IOException {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("支付服务接收到的消息:" + map);
        String orderId = (String)map.get("orderId");//String
        Integer productId = (Integer)map.get("productId");//Integer
        String orderNum = (String)map.get("orderNumber");//String

        System.out.println("支付服务接收到的orderId:" + orderId);
        System.out.println("支付服务接收到的productId:" + productId);
        System.out.println("支付服务接收到的orderNum:" + orderNum);

        //告诉broker,消息已经被确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}


DeadOrderConsumer.java



package com.example.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class DeadOrderConsumer {

    // 获得死信队列中的消息
    @RabbitHandler
    @RabbitListener(queues = "deadQueueA")
    public void process(Map map){
        System.out.println("订单取消支付后,从死信队列中接收到的消息:" + map);
        String orderId = (String)map.get("orderId");//String
        Integer productId = (Integer)map.get("productId");//Integer
        String orderNum = (String)map.get("orderNumber");//String


        System.out.println("取消支付后,从死信队列中接收到的orderId:" + orderId);
        System.out.println("取消支付后,从死信队列中接收到的productId:" + productId);
        System.out.println("取消支付后,从死信队列中接收到的orderNum:" + orderNum);


    }
}


application.yaml


server:
  servlet:
    context-path: /app
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了
    publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)
    listener:
      simple:
        acknowledge-mode: manual # 手动消息确认
        concurrency: 1 #消费者数量
        max-concurrency: 1  #消费者最大数量
        prefetch: 1  #消费者每次从队列中取几个消息