最近遇到了关于 RabbitMQ 的问题,打比方说:某个微服务模块中,RabbitMQ 的大部分消费者需要重试两次,而小部分消费者由于特殊原因并不需要进行重试。这就涉及到自定义重试次数的话题了,但在网上找了一圈没发现相关的,但是功夫不负有心人,最后还是解决了这个问题,接下来给大家分享一下~
目录
1 默认配置重试次数
2 自定义重试次数
2.1 消费者
① 配置文件
② 配置队列,绑定交换机
③ 消费者文件
2.2 生产者
① 配置文件
② 生产者文件
③ 测试文件
2.3 启动测试文件
1 默认配置重试次数
一般来说,关于 RabbitMQ 的重试次数是直接在配置文件中进行定义(比如 ),那么所有的消费者都将遵循这个配置条件,比如 ????
=spirng-boot-rabbitmq
=127.0.0.1
=5672
=admin
=admin
=true # 开启消费者重试机制
-attempts=3 # 最大重试次数
-interval=3000 # 重试时间间隔
该配置中的 max-attempts 决定了消费者的重试次数,不过有一点需求注意:max-attempts 指的是尝试次数,就是说最开始消费的那一次也是计算在内的,那么 max-attempts: 3 便是重试两次,另外一次是正常消费~
同时消费者遵循的是本模块的 RabbitMQ 配置,并不会读取生产者的配置。打比方说,生产者模块配置重试 3 次,而消费者模块配置重试 1 次,那么生产者给消费者发送消息,消费者进行消费,如果触发了重试,消费者也只会重试一次,它只遵循消费者模块的配置!!
如上,默认配置重试次数就算完成了,但是并没有实现针对不同消费者的自定义重试功能,请继续看第二章内容。
2 自定义重试次数
以应用广泛的订阅模式为例,由于消费者和生产者配置不一,注意消费者和生产者不在同一模块!因此分开阐述:
2.1 消费者
主要配置是在消费者这!!
① 配置文件
对于消费者来说,该配置不仅起到了连接作用,同时也启动了重试机制,默认重试 2 次。
=127.0.0.1
=5672
=admin
=admin
=true # 开启消费者重试机制
-attempts=3 # 最大重试次数
-interval=3000 # 重试时间间隔
② 配置队列,绑定交换机
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class FanoutRabbitConfig {
@Autowired
private ConnectionFactory connectionFactory;
//自定义工厂
@Bean
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
(connectionFactory);
(jsonMessageConverter());
(3);
(10);
(retries());
return factory;
}
@Bean
public RetryOperationsInterceptor retries() {
return ()
.maxAttempts(1) //设置最大尝试次数为1(不重试)
.backOffOptions(1000, 3.0, 10000)
.recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue retryTest1() {
return new Queue("yinyu.retryTest1");
}
@Bean
public Queue retryTest2() {
return new Queue("yinyu.retryTest2");
}
@Bean
public Exchange topicExchange() {
return new TopicExchange("yinyu");//交换机命名
}
//队列绑定交换机
@Bean
public List<Binding> allActivateBinding() {
return ((
(retryTest1()).to(topicExchange()).with("yinyu.retryTest1").noargs(),
(retryTest2()).to(topicExchange()).with("yinyu.retryTest2").noargs());
}
}
③ 消费者文件
用于接收消息,设置了一个对照组,一个自定义配置,一个默认配置
package ;
import ;
import ;
import ;
@Component
public class ReceiverA {
@RabbitListener(queues = "yinyu.retryTest1", containerFactory = "listenerContainerFactory")
public void retryReceiver1(Map<String,String> map) {
("retryTest 自定义配置开始, key: {}", ("key"));
if (!(("key"), "yinyu")){
throw new RuntimeException("value 值匹配不准确,请重新进行请求!!");
}
("retryTest 结束");
}
@RabbitListener(queues = "yinyu.retryTest2")
public void retryReceiver2(Map<String,String> map) {
("retryTest 默认配置开始, key: {}", ("key"));
if (!(("key"), "yinyu")){
throw new RuntimeException("value 值匹配不准确,请重新进行请求!!");
}
("retryTest 结束");
}
}
2.2 生产者
生产者不需要过多的配置,它的作用是发送消息
① 配置文件
写在 中,对于生产者来说奇起的是连接 rabbitmq 作用,如果它是调用其他模块的消费者,那么这个重试配置是不起作用的。
=127.0.0.1
=5672
=admin
=admin
=true # 开启消费者重试机制
-attempts=5 # 最大重试次数
-interval=3000 # 重试时间间隔
② 生产者文件
用于发送消息,
package ;
import ;
import ;
import ;
import ;
import ;
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void retryProducer1(){
Map<String,String> map = new HashMap<>();
("key","yinyu自定义");
("yinyu", "yinyu.retryTest1", map);
}
public void retryProducer2(){
Map<String,String> map = new HashMap<>();
("key","yinyu默认");
("yinyu", "yinyu.retryTest2", map);
}
}
③ 测试文件
package ;
import ;
import ;
import ;
import ;
import .;
@RunWith()
@SpringBootTest
public class SenderTest {
@Autowired
private Sender sender;
//测试自定义配置
@Test
public void testCustomConfig() {
sender.retryProducer1();
}
//测试默认配置
@Test
public void testDefaultConfig() {
sender.retryProducer2();
}
}
2.3 启动测试文件
有条件的各位可以启动一下生产者的测试文件中这两个方法,最终结果:
- retryProducer1 发送消息后,retryReceiver1 消费消息,虽然报错,但没有重试(遵循自定义配置)
- retryProducer2 发送消息后,retryReceiver2 消费消息,报错且重试 4 次(遵循默认配置)
完美实现自定义重试次数的需求!!