文章目录
- SpringAMQP
- 简单队列模型(BasicQueue)
- 工作队列模型(WorkQueue)
- 发布(Public)订阅(Subsrcibe)
- 1.Fanout Exchange
- 2.DirectExchage
- 3.TopicExchange
- 交换机队列创建绑定方式2-注解
- 生产者确认
- 1、 添加配置:
- 2、 创建ProducerAckConfig
- 3、 测试
- 消费者确认
- 1、简介
- 2、 确认模式测试
- 2.1、AUTO-自动确认模式
- 2.2、NONE-不确认模式
- 2.3、MANUAL-不确认模式
- 3、手动ack
- 死信队列
- 延迟队列
- 消息转换器
SpringAMQP
AMQP:应用间消息通信的一种协议,与语言和平台无关
简单队列模型(BasicQueue)
利用SpringAMQP实现HellowWorld中的基础消息队列功能
一、编写生产消息逻辑:
1.引入AMQP依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在publisher服务中编写application.yml,添加mq连接信息
spring:
rabbitmq:
host: 192.168.242.66
port: 5672
virtual-host: /myhost
username: admin
password: admin
3.在publisher服务新建一个测试类,作为消息生产者(发送消息)
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage(){
String queueName="simple.queue";
String message="hello,spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
二、编写消费逻辑
创建另一个模块(消息队列通常用与多个模块之间的通信)
1.引入依赖
2.编写application.yml,添加mq连接信息
3.在consumer服务中新建一个类,编写消费逻辑
@Component
public class SpringRabbitLister {
@RabbitListener(queues = "simple.queue")
public void listernRabbit(String message){
System.out.println("消费者接受到的消息是:"+message);
}
}
工作队列模型(WorkQueue)
模拟WorkQueue,实现一个队列绑定多个消费者
@Component
public class SpringRabbitLister {
@RabbitListener(queues = "simple.queue")
public void listernRabbit(String message) throws InterruptedException {
System.out.println("消费者1接受到的消息是:"+message+LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listernRabbit2(String message) throws InterruptedException {
System.err.println("消费者2接受到的消息是:"+message+LocalTime.now());
Thread.sleep(200);
}
}
发送消息
@Test
public void sendMessage() throws InterruptedException {
String queueName="simple.queue";
String msg="hello RabbitMQ";
for (int i = 1; i <=50; i++) {
rabbitTemplate.convertAndSend(queueName,msg+i);
Thread.sleep(20);
}
}
消息预取限制:
就能实现消费能力强的能抢到更多(能者多劳)
修改application.yml文件,设置profetch这个值
spring:
rabbitmq:
host: 192.168.242.66
port: 5672
virtual-host: /myhost
username: admin
password: admin
publisher-returns: true
publisher-confirm-type: correlated # SIMPLE-同步确认(阻塞) CORRELATED-异步确认
listener:
type: simple # simple-listener容器使用一个额外线程处理消息 direct-listener(监听器)容器直接使用consumer线程
simple:
prefetch: 1 # 能者多劳
acknowledge-mode: manual #手动确认消息
#能者多劳+多线程=>避免消息堆积
concurrency: 3 # 避免消息堆积,初始化多个消费者线程
max-concurrency: 5 #最大允许并发处理消息的线程数
发布(Public)订阅(Subsrcibe)
发布订阅模式与之前的区别就是允许将同一消息发送给多个消费者。实现方式就是加入了交换机(exchange)
交换机的作用:
- 接受publisher发送的消息
- 将消息按照规则路由与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
常见的exchange类型包括
- fanout:广播
- Direct:路由
- Topic:话题
1.Fanout Exchange
Fanout Exchange 会将接受到的消息路由到每一个跟其绑定的queue
实现:
1、在consumer服务中声明Exchange、Queue、Binding(绑定关系对象)
@Configuration
public class FanoutConfig {
/**
* 交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("demo.fanout")
.durable(true)//配置持久化
.ignoreDeclarationExceptions() //忽略声明时的异常
.build();
}
/**
* 队列
* @return
*/
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("demo.queue1")//创建持久化的队列
.build();
}
@Bean
public Queue fanoutQueue2(){
return QueueBuilder.durable("demo.queue2")//创建持久化的队列
.build();
}
/**
* 绑定关系(注意bean的名称,Spring会直接从容器中找)
*/
@Bean
public Binding fanoutBind1(FanoutExchange fanoutExchange, Queue fanoutQueue1){
return BindingBuilder
.bind(fanoutQueue1)//队列
.to(fanoutExchange);//交换机
}
@Bean
public Binding fanoutBind2(FanoutExchange fanoutExchange, Queue fanoutQueue2){
return BindingBuilder
.bind(fanoutQueue2)//队列
.to(fanoutExchange);//交换机
}
}
设置消费者监听队列:
@Configuration
public class FanoutConsumer {
@RabbitListener(queues = "demo.queue1")
public void consumer1(String msg){
System.out.println("消费者1接收消息"+msg);
}
@RabbitListener(queues = "demo.queue2")
public void consumer2(String msg){
System.out.println("消费者2接收消息"+msg);
}
}
消息发送
@Test
public void testFanout(){
String msg="hello";
//由于是fanout模式,routerkey设置为""
rabbitTemplate.convertAndSend("demo.fanout","",msg);
}
两个消费者都能接收到消息
2.DirectExchage
DirectExchage会将接受到的消息根据路由规则到指定的Queue、因此称为路由模式
- 每一个Queue都与Exchange设置一个BingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchage将消息路由到BingKey与消息RoutingKey一致的队列
1.配置交换机和队列
@Configuration
public class DirectConfig {
/**
* 交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("demo.direct")
.durable(true)//配置持久化
.ignoreDeclarationExceptions() //忽略声明时的异常
.build();
}
/**
* 队列
* @return
*/
@Bean
public Queue directQueue1(){
return QueueBuilder.durable("direct.queue1")//创建持久化的队列
.build();
}
@Bean
public Queue directQueue2(){
return QueueBuilder.durable("direct.queue2")//创建持久化的队列
.build();
}
/**
* 绑定关系(注意bean的名称,Spring会直接从容器中找)
*/
@Bean
public Binding directBind1(DirectExchange directExchange, Queue directQueue1){
return BindingBuilder
.bind(directQueue1)//队列
.to(directExchange)//交换机
.with("direct.queue1");//routerKey
}
@Bean
public Binding directBind2(DirectExchange directExchange, Queue directQueue2){
return BindingBuilder
.bind(directQueue2)//队列
.to(directExchange)//交换机
.with("direct.queue2");//routerKey
}
}
2、消费者绑定队列
@RabbitListener(queues = "direct.queue1")
public void consumer1(String msg){
System.out.println("消费者1接收消息"+msg);
}
@RabbitListener(queues = "direct.queue2")
public void consumer2(String msg){
System.out.println("消费者2接收消息"+msg);
}
3、发送消息
@Test
public void testFanout(){
String msg="hello queue1";
rabbitTemplate.convertAndSend("demo.direct","direct.queue1",msg);
}
只有队列1(routerKey为"direct.queue1")收到消息
Direct交换机和Fanout交换机的差异:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给那个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
3.TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以*.*分割
Queue与Exchange指定的BindingKey时可以使用通配符
#:表示0个或多个单词
*:表示一个单词
实现:
1、设置交换机和队列
为队列2的routerKey设置通配符
@Configuration
public class TopicConfig {
/**
* 交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange("demo.topic")
.durable(true)//配置持久化
.ignoreDeclarationExceptions() //忽略声明时的异常
.build();
}
/**
* 队列
* @return
*/
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("topic.queue1")//创建持久化的队列
.build();
}
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("topic.queue2")//创建持久化的队列
.build();
}
/**
* 绑定关系(注意bean的名称,Spring会直接从容器中找)
*/
@Bean
public Binding fanoutBind1(TopicExchange topicExchange, Queue topicQueue1){
return BindingBuilder
.bind(topicQueue1)//队列
.to(topicExchange)
.with("topic.queue1");//交换机
}
@Bean
public Binding fanoutBind2(TopicExchange topicExchange, Queue topicQueue2){
return BindingBuilder
.bind(topicQueue2)//队列
.to(topicExchange)//交换机
.with("topic.*");//设置通配符,只要是topic.*都会发送到该队列
}
}
2、消费者接收
@RabbitListener(queues = "topic.queue1")
public void consumer1(String msg){
System.out.println("消费者1接收消息"+msg);
}
@RabbitListener(queues = "topic.queue2")
public void consumer2(String msg){
System.out.println("消费者2接收消息"+msg);
}
3.消息发送
@Test
public void testFanout(){
String msg="hello queue";
rabbitTemplate.convertAndSend("demo.topic","topic.queue1",msg);
}
结果:两个消费者都收到消息
交换机队列创建绑定方式2-注解
除了上述演示的申明bean设置队列和交换机
还可以通过@RabbitListener
:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
-
bindings
:指定绑定关系,可以有多个。值是@QueueBinding
的数组。@QueueBinding
包含下面属性:-
value
:这个消费者关联的队列。值是@Queue
,代表一个队列 -
exchange
:队列所绑定的交换机,值是@Exchange
类型 -
key
:队列和交换机绑定的RoutingKey
-
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "stock.queue",durable = "true"),
exchange =@Exchange(value = "pay.exchange",type = "topic",durable = "true",ignoreDeclarationExceptions = "true"),
key = "pay.#"
)
)
public void stockConsumer(String msg){
System.out.println("库存服务获取到消息"+msg);
}
生产者确认
为了确保生产者成功发送消息,RabbitTemplate提供了生产者确认回调,消息发送失败可以调用设置的回调方法进行处理,步骤如下:
1、 添加配置:
server:
port: 8081
spring:
rabbitmq:
host: 192.168.242.66
port: 5672
virtual-host: /myhost
username: admin
password: admin
publisher-returns: true
publisher-confirm-type: correlated # SIMPLE-同步确认(阻塞) CORRELATED-异步确认
2、 创建ProducerAckConfig
内容如下:
@Configuration
@Slf4j
public class RabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
// 确认消息是否到达交换机
this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack){
log.warn("消息没有到达交换机:" + cause);
}
});
// 确认消息是否到达队列,到达队列该方法不执行
this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.warn("消息没有到达队列,来自于交换机:{},路由键:{},消息内容:{}", exchange, routingKey, new String(message.getBody()));
});
}
}
3、 测试
测试1:消息正常发送,正常消费
测试2:消息到达交换机,没有达到队列(交换机存在,但是路由key和绑定的队列不一致)
amqpTemplate.convertAndSend("demo.exchange","c.a.b" , "hehe");
测试3:消息不能到达交换机(交换机不存在)
amqpTemplate.convertAndSend("demo.exchange2","a.b" , "hehe");
消费者确认
1、简介
消费者接受到消息使用时的确认机制:ack,默认消费者接受到消息后自动确认
springboot-rabbit提供了三种消息确认模式:
- AcknowledgeMode.NONE:不确认模式(不管程序是否异常只要执行了监听方法,消息即被消费。相当于rabbitmq中的自动确认,这种方式不推荐使用)
- AcknowledgeMode.AUTO:自动确认模式(默认,消费者没有异常会自动确认,有异常则不确认,无限重试,导致程序死循环。不要和rabbit中的自动确认混淆)
- AcknowledgeMode.MANUAL:手动确认模式(需要手动调用channel.basicAck确认,可以捕获异常控制重试次数,甚至可以控制失败消息的处理方式)
全局配置方法:
# rabbitmq:
listener:
simple:
acknowledge-mode: manual # manual-手动 auto-自动(无异常直接确认,有异常无限重试) none-不重试
或者
以下只针对设置的消费者
@RabbitListener(queues = "demo.queue",
ackMode = "MANUAL" )// NONE,AUTO
2、 确认模式测试
2.1、AUTO-自动确认模式
在消费者中制造一个异常:然后重启消费者服务
@RabbitListener(queues = "demo.queue")
public void consumer1(String msg){//String类型的形参表示获取到的队列中的消息
System.out.println("获取到消息:"+ msg);
int i = 1/0;
}
运行生产者测试代码发送消息到demo.exchange交换机测试:
@Test
void contextLoads() {
amqpTemplate.convertAndSend("demo.exchange","a.b" , "hehe");
}
测试结果:
可以看到mq将无限重试,消费消息:(默认AUTO确认模式,消息消费有异常,设置消息重新归队到mq消息队列中,然后消费者监听器又可以重新消费消息)
消息将无法消费:
停掉应用消息回到Ready状态,消息不会丢失!
2.2、NONE-不确认模式
修改消费者确认模式为NONE:重启消费者服务
再次执行生产者测试代码:
所有的消息都被消费:
2.3、MANUAL-不确认模式
修改消费者确认模式为:MANUAL
再次执行生产者测试代码:
测试结果:消息接收到了但是队列中消息等待确认,如果停掉程序会重新进入ready状态
程序停止运行:
修改消费者代码:确认消息
3、手动ack
@RabbitListener(queues = "demo.queue")
public void consumer1(String msg, Channel channel , Message message) throws Exception {
try {
System.out.println("接收到消息:" + msg);
int i = 1 / 0;
// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息重试后依然失败,拒绝再次接收");
// 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定