通过RabbitMQ的DIRECT模式以及死信队列实现延时任务
@Configuration
public class RabbitMQConfig {
//交换机
public static final String EXCHANGE_NAME = "exchange";
public static final String DEAD_EXCHANGE_NAME = "";
//队列
public static final String QUEUE_NAME1 = "queue1";
public static final String QUEUE_NAME2 = "queue2";
public static final String QUEUE_NAME3 = "queue3";
public static final String DEAD_QUEUE_NAME1 = "delay.queue1";
public static final String DEAD_QUEUE_NAME2 = "delay.queue2";
public static final String DEAD_QUEUE_NAME3 = "delay.queue3";
//各队列绑定的路由
public static final String QUEUE1_ROUTINGKEY = "key.notify1";
public static final String QUEUE2_ROUTINGKEY = "key.notify2";
public static final String QUEUE3_ROUTINGKEY = "key.notify3";
public static final String DEAD_QUEUE1_ROUTINGKEY = "key.delay1";
public static final String DEAD_QUEUE2_ROUTINGKEY = "key.delay2";
public static final String DEAD_QUEUE3_ROUTINGKEY = "key.delay3";
//各死信队列的ttl 未消费消息的过期时间
@Value("${.ttl1}")
private int DEAD_QUEUE_TTL1;
@Value("${.ttl2}")
private int DEAD_QUEUE_TTL2;
//声明业务EXchange
@Bean("exchange")
public DirectExchange exchange(){
return new DirectExchange(EXCHANGE_NAME);
}
//声明私信Exchange
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
//声明支付信息队列1
@Bean("queue1")
public Queue queue1(){
Map<String,Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",DEAD_QUEUE1_ROUTINGKEY);
return QueueBuilder.durable(QUEUE_NAME1).withArguments(args).build();
}
//声明支付信息队列2
@Bean("queue2")
public Queue queue2(){
Map<String,Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",DEAD_QUEUE2_ROUTINGKEY);
return QueueBuilder.durable(QUEUE_NAME2).withArguments(args).build();
}
//声明支付信息队列3
@Bean("queue3")
public Queue queue3(){
Map<String,Object> args = new HashMap<>(1);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// // x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",DEAD_QUEUE3_ROUTINGKEY);
return QueueBuilder.durable(QUEUE_NAME3).withArguments(args).build();
}
//声明死信队列1
@Bean("deadQueue1")
public Queue deadQueue1(){
Map<String,Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",QUEUE2_ROUTINGKEY);
// x-message-ttl 这里声明消息未被消费的过期时间
args.put("x-message-ttl",DEAD_QUEUE_TTL1);
return QueueBuilder.durable(DEAD_QUEUE_NAME1).withArguments(args).build();
}
//声明死信队列2
@Bean("deadQueue2")
public Queue deadQueue2(){
Map<String,Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",QUEUE3_ROUTINGKEY);
// x-message-ttl 这里声明消息未被消费的过期时间
args.put("x-message-ttl",DEAD_QUEUE_TTL2);
return QueueBuilder.durable(DEAD_QUEUE_NAME2).withArguments(args).build();
}
//声明死信队列3
@Bean("deadQueue3")
public Queue deadQueue3(){
Map<String,Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
// ("x-dead-letter-routing-key",QUEUE4_ROUTINGKEY);
return QueueBuilder.durable(DEAD_QUEUE_NAME3).withArguments(args).build();
}
//声明支付信息序列1绑定的关系
@Bean
public Binding infoBinding1(@Qualifier("queue1") Queue queue,
@Qualifier("exchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(QUEUE1_ROUTINGKEY);
}
//声明支付信息序列2绑定的关系
@Bean
public Binding infoBinding2(@Qualifier("queue2") Queue queue,
@Qualifier("exchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(QUEUE2_ROUTINGKEY);
}
//声明支付信息序列3绑定的关系
@Bean
public Binding infoBinding3(@Qualifier("queue3") Queue queue,
@Qualifier("exchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(QUEUE3_ROUTINGKEY);
}
//声明死信队列1绑定的关系
@Bean
public Binding deadInfoBinding1(@Qualifier("deadQueue1") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE1_ROUTINGKEY);
}
//声明死信队列2绑定的关系
@Bean
public Binding deadInfoBinding2(@Qualifier("deadQueue2") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE2_ROUTINGKEY);
}
//声明死信队列3绑定的关系
@Bean
public Binding deadInfoBinding3(@Qualifier("deadQueue3") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE3_ROUTINGKEY);
}
}