通过RabbitMQ的DIRECT模式以及死信队列实现延时任务

时间:2025-02-19 08:17:05
@Configuration public class RabbitMQConfig { //交换机 public static final String EXCHANGE_NAME = "exchange"; public static final String DEAD_EXCHANGE_NAME = ""; //队列 public static final String QUEUE_NAME1 = "queue1"; public static final String QUEUE_NAME2 = "queue2"; public static final String QUEUE_NAME3 = "queue3"; public static final String DEAD_QUEUE_NAME1 = "delay.queue1"; public static final String DEAD_QUEUE_NAME2 = "delay.queue2"; public static final String DEAD_QUEUE_NAME3 = "delay.queue3"; //各队列绑定的路由 public static final String QUEUE1_ROUTINGKEY = "key.notify1"; public static final String QUEUE2_ROUTINGKEY = "key.notify2"; public static final String QUEUE3_ROUTINGKEY = "key.notify3"; public static final String DEAD_QUEUE1_ROUTINGKEY = "key.delay1"; public static final String DEAD_QUEUE2_ROUTINGKEY = "key.delay2"; public static final String DEAD_QUEUE3_ROUTINGKEY = "key.delay3"; //各死信队列的ttl 未消费消息的过期时间 @Value("${.ttl1}") private int DEAD_QUEUE_TTL1; @Value("${.ttl2}") private int DEAD_QUEUE_TTL2; //声明业务EXchange @Bean("exchange") public DirectExchange exchange(){ return new DirectExchange(EXCHANGE_NAME); } //声明私信Exchange @Bean("deadExchange") public DirectExchange deadExchange(){ return new DirectExchange(DEAD_EXCHANGE_NAME); } //声明支付信息队列1 @Bean("queue1") public Queue queue1(){ Map<String,Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key",DEAD_QUEUE1_ROUTINGKEY); return QueueBuilder.durable(QUEUE_NAME1).withArguments(args).build(); } //声明支付信息队列2 @Bean("queue2") public Queue queue2(){ Map<String,Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key",DEAD_QUEUE2_ROUTINGKEY); return QueueBuilder.durable(QUEUE_NAME2).withArguments(args).build(); } //声明支付信息队列3 @Bean("queue3") public Queue queue3(){ Map<String,Object> args = new HashMap<>(1); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME); // // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key",DEAD_QUEUE3_ROUTINGKEY); return QueueBuilder.durable(QUEUE_NAME3).withArguments(args).build(); } //声明死信队列1 @Bean("deadQueue1") public Queue deadQueue1(){ Map<String,Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange",EXCHANGE_NAME); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key",QUEUE2_ROUTINGKEY); // x-message-ttl 这里声明消息未被消费的过期时间 args.put("x-message-ttl",DEAD_QUEUE_TTL1); return QueueBuilder.durable(DEAD_QUEUE_NAME1).withArguments(args).build(); } //声明死信队列2 @Bean("deadQueue2") public Queue deadQueue2(){ Map<String,Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange",EXCHANGE_NAME); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key",QUEUE3_ROUTINGKEY); // x-message-ttl 这里声明消息未被消费的过期时间 args.put("x-message-ttl",DEAD_QUEUE_TTL2); return QueueBuilder.durable(DEAD_QUEUE_NAME2).withArguments(args).build(); } //声明死信队列3 @Bean("deadQueue3") public Queue deadQueue3(){ Map<String,Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange",EXCHANGE_NAME); // x-dead-letter-routing-key 这里声明当前队列的死信路由key // ("x-dead-letter-routing-key",QUEUE4_ROUTINGKEY); return QueueBuilder.durable(DEAD_QUEUE_NAME3).withArguments(args).build(); } //声明支付信息序列1绑定的关系 @Bean public Binding infoBinding1(@Qualifier("queue1") Queue queue, @Qualifier("exchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(QUEUE1_ROUTINGKEY); } //声明支付信息序列2绑定的关系 @Bean public Binding infoBinding2(@Qualifier("queue2") Queue queue, @Qualifier("exchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(QUEUE2_ROUTINGKEY); } //声明支付信息序列3绑定的关系 @Bean public Binding infoBinding3(@Qualifier("queue3") Queue queue, @Qualifier("exchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(QUEUE3_ROUTINGKEY); } //声明死信队列1绑定的关系 @Bean public Binding deadInfoBinding1(@Qualifier("deadQueue1") Queue queue, @Qualifier("deadExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE1_ROUTINGKEY); } //声明死信队列2绑定的关系 @Bean public Binding deadInfoBinding2(@Qualifier("deadQueue2") Queue queue, @Qualifier("deadExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE2_ROUTINGKEY); } //声明死信队列3绑定的关系 @Bean public Binding deadInfoBinding3(@Qualifier("deadQueue3") Queue queue, @Qualifier("deadExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE3_ROUTINGKEY); } }

相关文章