RabbitMQ实现延迟发送消息

时间:2023-03-12 19:05:48

前言

最近在做一个可以根据用户选择的时间,实现微信推送订阅消息的功能,突然想到rabbitmq好像可以实现这个功能,本着试试的心态开始研究,第一个想到的就是使用死信队列

死信队列

何为死信队列,其实rabbitmq本身并不能实现延迟发送消息的功能,不过因为本身有着队列ttl+死信exchange的机制,可以借助这个机制实现延迟发送消息

原理就是,用户发送消息到一个队列上并设置过期时间,但是这个队列没有消费者,到了过期时间,就由该队列绑定的死信exchange根据路由key的方式发送到另一个队列上,并消费,从而实现延迟发送

注:消息不仅只有超时才会成为死信,还有可能消息被消费者reject或者返回nack

理论上很简单,开始实践

导入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建配置类

1.声明死信队列,死信交换机,并绑定

2.声明普通队列,普通交换机,并绑定

3.普通队列需设置死信交换机,以及死信路由key

前两步很简单,就不贴代码了,需要注意第三步

Map<String, Object> args = new HashMap<>();
//设置死信交换机
args.put("x-dead-letter-exchange", "死信交换机名称");
//设置死信 routing_key
args.put("x-dead-letter-routing-key", "死信路由key");
return new Queue("普通队列", true, false, false, args);

创建之后,就需要一个生产者,发送消息

 String id = UUID.randomUUID().toString();
logger.info("传递id消息========",id);
rabbitTemplate.convertAndSend(
"普通交换机名称","绑定普通队列路由key",id,
message -> {
//设置过期时间(毫秒)
message.getMessageProperties().setExpiration("过期时间");
return message;
});

生产消息后,创建消费者配置类

@RabbitListener(queues = "死信队列名称")
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
logger.info("接收到消息:{}", msg);
}

至此死信队列完成,测试一下,果然能实现延迟发送,不出意外的话马上要出意外了,我的最初需求是可以根据客户选择的时间推送消息,死信队列不就只能固定时间吗,我要是有别的时间,还能重新创建一个队列发送消息,不然可能会出现一种情况,比如第一个用户发送了一个10秒的消息,第二个用户发送一个5秒的消息,那么在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信,所以只能一个交换机绑定的普通队列对应一个时间,但是又不想创建那么多,非常麻烦不说,关键是解决不了我的需求;

于是在网上找资料,看到rabbitmq推出的一个插件 rabbitmq-delayed-message-exchange,这时候才明白,死信队列设计的初衷只是为了存储那些没有被正常消费的消息,便于重新发送,不至于出现消息丢失等情况,而 rabbitmq-delayed-message-exchange是专门用于发送延迟消息的,于是开始研究插件

 插件:rabbitmq-delayed-message-exchange

安装

要想使用插件肯定是先下载啦,网上教程有很多,我用的比较方便简单的方式下载,因为我是linux系统,所以直接使用命令下载

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

注意:需要安装自己rabbitmq对应版本的插件

安装完毕启动

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

这时候执行命令就可以看到有这个插件了

rabbitmq-plugins list

RabbitMQ实现延迟发送消息


能看到这个插件,恭喜你安装成功,真聪明,接着往下走

代码配置

1.首先需声明队列,交换机根据路由key绑定

2.使用延迟消息交换器需要声明一个 x-delayed-message 类型的交换器

类似这样

  Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange("延迟消息交换机", "x-delayed-message", false, false, args);

然后就可以定义生产者发送消息了

rabbitTemplate.convertAndSend("延迟消息交换机", "路由Key", "消息", message -> {
// 设置过期时间
message.getMessageProperties().setDelay("过期时间");
return message;
});

消费者同上

实验一下发现,不同时间也可以正常消费,这是因为这个插件可以让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间,原以为问题解决了,没想到又出问题了,他这个时间并不是随意的(网上说不能超过1个月),是有一个限制时间,而且性能要比原生的要差一点,不能忍,于是又想了个办法,定时任务

定时任务

定时任务的原理就很简单了,因为我的发送时间是存在数据库的,所以每过一段时间查询一下是否有需要发送的消息,有则发送,并消费

实现很简单

启动类添加注解

@EnableScheduling

还是老样子,声明队列,绑定交换机

定时任务的语法可以在网上找,需要用到@Scheduled注解,放在需要定时的方法上,可以实现定时执行

总结

至此问题基本解决了,不过可能会造成比用户指定的时间晚几秒推送的情况,不过我的功能并不需要这么精准,所以使用这种方法无非是最好的方式,三种方法都有使用的场景,没有好与不好,根据自己的业务选择就好