RabbitMQ延时队列应用场景

时间:2024-02-25 09:10:22

应用场景

我们系统未付款的订单,超过一定时间后,需要系统自动取消订单并释放占有物品

image-20211020212315320

常用的方案

就是利用Spring schedule定时任务,轮询检查数据库

但是会消耗系统内存,增加了数据库的压力、还存在较大的时间误差

image-20211020212617711

解决:rabbitmq的消息TTL和死信Exchange结合

介绍

1.何为消息TTL、死信

死信:对消息设置的过期时间到了,这个消息还没有被消费就认为这个消息死了,死了的消息会进入死信交换机(Dead Letter Exchanges)

成为死信的三种条件:

  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上

消息TTL:消息的TTL就是消息的存活时间

RabbitMQ可以对队列和消息都设置过期时间,但代表的都是一个意思,只要消息在设置时间内没有消费,消息就死了,就被称为死信

如果队列和消息都设置了过期时间,那么就取时间最小的,单个消息的过期时间才是延时队列的关键

2.如何运作

设置队列过期时间

image-20211020213655496

消费者P会通过一个路由键deal.message发送消息给X交换机,然后继续发送给delay queau队列,这个队列比较特殊,设置了过期时间5分钟过期,还设置了x-dead-letter-exchange用于指定下一个接收的交换机,消息过期之后会成为死信直接进入delay.exchange交换机,利用x-dead-letter-routing-key绑定的路由键找到下一个队列,这时候只需要有人监听这个队列。

设置消息过期时间

image-20211020214756629

消费者发送一个消息,设置了5分钟过期时间,最后交给了延时队列,延时队列说消息死了不要乱放,指定了一个死信路由,用于找到下一个队列的路由键,等到五分钟后服务器会自动检查是否过期,过期的话会交给delay.exchange路由,最后再交给delay.message

代码模拟

image-20211020215722888

下订单成功先发动给order-event-exchangeorder-event-exchange绑定了两个路由键order.create.orderorder.release.order,根据order.create.order路由键找到order.delay.queue队列,这是一个特殊的队列,上图所诉,消息的存活时间为一分钟,消息在order.delay.queue队列中没人使用变成死信了,交给order-event-exchange交换机,最后通过order.release.order绑定关系找到了order.release.order.queue队列

@Configuration
public class MyMQConfig {

    //监听最后一个队列,获取那些过期的订单消息
    @RabbitListener(queues = "order.release.order.queue")
    public  void  listerner(OrderEntity orderEntity,Channel channel,Message message) throws IOException {
        System.out.println("收到过期订单信息"+orderEntity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    //特殊队列
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000);
        Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        return orderDelayQueue;
    }

    //最后接收死信消息的队列
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

   //事件交换机
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }

    //绑定order.delay.queue队列和的order-event-exchange交换机的路由键
    @Bean
    public Binding orderCreateBingding() {
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }

    //绑定order.release.order.queue队列和的order-event-exchange交换机的路由键
    @Bean
    public Binding orderReleaseBingding() {
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
    }
 

}

测试

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @ResponseBody
    @GetMapping("/test/createOrder")
    public String createOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return "ok";
    }

image-20211020221215759

库存解锁实际场景

在库存服务有个stock-event-exchange交换机,如果我们想要解锁库存,

1、首先订单下成功、库存锁定成功

2、锁定成功就要通过stock.locked路由键发送一个消息给交换机stock-event-exchange,消息内容包括哪个订单、哪些商品、多少库存等等

3、交换机通过绑定关系再发送给延时队列stock.delay.queue

4、订单可能需要30分钟才会自动关闭,50分钟之后来检查库存,就会知道订单支付没有

5、50分钟消息没有被消息,就变为死信,通过stock.release路由键绑定关系交给stock-event-exchange交换机

6、stock-event-exchange交换机通过stock.release路由键绑定关系找到strock.relelase.stock.queue队列

7、所有的解锁库存服务就监听这个队列里的消息,只要这个队列里消息能够到达的都是超时没有支付订单的

下单远程锁定库存,然后将仓库锁定库存的数据发给订单,当在订单下单失败时,由于不是分布式事务,订单回滚,但仓库不回滚,所以订单一失败,就需要通过订单拿到mq中仓库传来的数据通知仓库解锁库存

库存解锁场景:

1、下订单成功,订单过期没有支付被系统自动取消或者用户手动取消,都要解锁库存

2、下订单成功、库存锁定成功,但是业务调用失败导致订单回滚,之前锁定的库存就自动解锁,Seata分布式事务太慢,就要用一段时间后自动解决库存。

3、订单失败,因为锁库存失败有一个商品没有锁成功,导致整个锁库存服务都回滚,

消息队列收到库存消息场景

消息队列收到消息之后

  • 如果没有查到数据库有锁定成功的数据,说明库存锁失败了,锁库存自动回滚,数据库查不到记录无需解锁
  • 如果查到有数据,就说明库存锁定成功了
    • 没有这个订单必须解锁库存
    • 有订单,订单没人支付失效了才能解锁库存

定时关闭订单实际场景

image-20211021201122194

同上原理类似也是利用死信路由,订单创建后,默认放入延时队列,也就是订单的有效时间,超过这个时间没有支付或者用户主动取消都会导致订单信息进入order.release.order.queue队列,最后被释放