SpringAMQP

时间:2024-02-16 18:59:19

文章目录

  • SpringAMQP
    • 简单队列模型(BasicQueue)
    • 工作队列模型(WorkQueue)
    • 发布(Public)订阅(Subsrcibe)
      • 1.Fanout Exchange
      • 2.DirectExchage
      • 3.TopicExchange
      • 交换机队列创建绑定方式2-注解
      • 生产者确认
        • 1、 添加配置:
        • 2、 创建ProducerAckConfig
        • 3、 测试
      • 消费者确认
        • 1、简介
        • 2、 确认模式测试
          • 2.1、AUTO-自动确认模式
          • 2.2、NONE-不确认模式
          • 2.3、MANUAL-不确认模式
        • 3、手动ack
    • 死信队列
    • 延迟队列
    • 消息转换器

SpringAMQP

AMQP:应用间消息通信的一种协议,与语言和平台无关

简单队列模型(BasicQueue)

利用SpringAMQP实现HellowWorld中的基础消息队列功能

一、编写生产消息逻辑:

1.引入AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.在publisher服务中编写application.yml,添加mq连接信息

spring:
  rabbitmq:
    host: 192.168.242.66
    port: 5672
    virtual-host: /myhost
    username: admin
    password: admin

3.在publisher服务新建一个测试类,作为消息生产者(发送消息)

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage(){
        String queueName="simple.queue";
        String message="hello,spring amqp";
        rabbitTemplate.convertAndSend(queueName,message);
    }

二、编写消费逻辑
创建另一个模块(消息队列通常用与多个模块之间的通信)

1.引入依赖

2.编写application.yml,添加mq连接信息

3.在consumer服务中新建一个类,编写消费逻辑

@Component
public class SpringRabbitLister {
    @RabbitListener(queues = "simple.queue")
    public  void listernRabbit(String message){
        System.out.println("消费者接受到的消息是:"+message);
    }
}

工作队列模型(WorkQueue)

模拟WorkQueue,实现一个队列绑定多个消费者

@Component
public class SpringRabbitLister {
    @RabbitListener(queues = "simple.queue")
    public  void listernRabbit(String message) throws InterruptedException {
        System.out.println("消费者1接受到的消息是:"+message+LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public  void listernRabbit2(String message) throws InterruptedException {
        System.err.println("消费者2接受到的消息是:"+message+LocalTime.now());
        Thread.sleep(200);
    }
}

发送消息

@Test
    public void sendMessage() throws InterruptedException {
        String queueName="simple.queue";
        String msg="hello RabbitMQ";
        for (int i = 1; i <=50; i++) {
            rabbitTemplate.convertAndSend(queueName,msg+i);
            Thread.sleep(20);
        }
    }

消息预取限制:

就能实现消费能力强的能抢到更多(能者多劳)

修改application.yml文件,设置profetch这个值

spring:
  rabbitmq:
    host: 192.168.242.66
    port: 5672
    virtual-host: /myhost
    username: admin
    password: admin
    publisher-returns: true
    publisher-confirm-type: correlated # SIMPLE-同步确认(阻塞) CORRELATED-异步确认
    listener:
      type: simple # simple-listener容器使用一个额外线程处理消息  direct-listener(监听器)容器直接使用consumer线程
      simple:
        prefetch: 1 # 能者多劳
        acknowledge-mode: manual #手动确认消息
      #能者多劳+多线程=>避免消息堆积
        concurrency: 3 # 避免消息堆积,初始化多个消费者线程
        max-concurrency: 5 #最大允许并发处理消息的线程数

发布(Public)订阅(Subsrcibe)

发布订阅模式与之前的区别就是允许将同一消息发送给多个消费者。实现方式就是加入了交换机(exchange)

交换机的作用:

  • 接受publisher发送的消息
  • 将消息按照规则路由与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

常见的exchange类型包括

  • fanout:广播
  • Direct:路由
  • Topic:话题

1.Fanout Exchange

Fanout Exchange 会将接受到的消息路由到每一个跟其绑定的queue

实现:

1、在consumer服务中声明Exchange、Queue、Binding(绑定关系对象)

@Configuration
public class FanoutConfig {
    /**
     * 交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange("demo.fanout")
                .durable(true)//配置持久化
                .ignoreDeclarationExceptions() //忽略声明时的异常
                .build();
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return QueueBuilder.durable("demo.queue1")//创建持久化的队列
                .build();
    }
    @Bean
    public Queue fanoutQueue2(){
        return QueueBuilder.durable("demo.queue2")//创建持久化的队列
                .build();
    }

    /**
     * 绑定关系(注意bean的名称,Spring会直接从容器中找)
     */
    @Bean
    public Binding fanoutBind1(FanoutExchange fanoutExchange, Queue fanoutQueue1){
        return BindingBuilder
                .bind(fanoutQueue1)//队列
                .to(fanoutExchange);//交换机
    }

    @Bean
    public Binding fanoutBind2(FanoutExchange fanoutExchange, Queue fanoutQueue2){
        return BindingBuilder
                .bind(fanoutQueue2)//队列
                .to(fanoutExchange);//交换机
    }

}

设置消费者监听队列:

@Configuration
public class FanoutConsumer {
    @RabbitListener(queues = "demo.queue1")
    public void consumer1(String msg){
        System.out.println("消费者1接收消息"+msg);
    }

    @RabbitListener(queues = "demo.queue2")
    public void consumer2(String msg){
        System.out.println("消费者2接收消息"+msg);
    }
}

消息发送

    @Test
    public void testFanout(){
        String msg="hello";
        //由于是fanout模式,routerkey设置为""
        rabbitTemplate.convertAndSend("demo.fanout","",msg);
    }

两个消费者都能接收到消息
在这里插入图片描述

2.DirectExchage

DirectExchage会将接受到的消息根据路由规则到指定的Queue、因此称为路由模式

  • 每一个Queue都与Exchange设置一个BingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchage将消息路由到BingKey与消息RoutingKey一致的队列

1.配置交换机和队列

@Configuration
public class DirectConfig {
    /**
     * 交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("demo.direct")
                .durable(true)//配置持久化
                .ignoreDeclarationExceptions() //忽略声明时的异常
                .build();
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue directQueue1(){
        return QueueBuilder.durable("direct.queue1")//创建持久化的队列
                .build();
    }
    @Bean
    public Queue directQueue2(){
        return QueueBuilder.durable("direct.queue2")//创建持久化的队列
                .build();
    }

    /**
     * 绑定关系(注意bean的名称,Spring会直接从容器中找)
     */
    @Bean
    public Binding directBind1(DirectExchange directExchange, Queue directQueue1){
        return BindingBuilder
                .bind(directQueue1)//队列
                .to(directExchange)//交换机
                .with("direct.queue1");//routerKey
    }

    @Bean
    public Binding directBind2(DirectExchange directExchange, Queue directQueue2){
        return BindingBuilder
                .bind(directQueue2)//队列
                .to(directExchange)//交换机
                .with("direct.queue2");//routerKey
    }
}


2、消费者绑定队列

    @RabbitListener(queues = "direct.queue1")
    public void consumer1(String msg){
        System.out.println("消费者1接收消息"+msg);
    }

    @RabbitListener(queues = "direct.queue2")
    public void consumer2(String msg){
        System.out.println("消费者2接收消息"+msg);
    }

3、发送消息

    @Test
    public void testFanout(){
        String msg="hello queue1";
        rabbitTemplate.convertAndSend("demo.direct","direct.queue1",msg);
    }

只有队列1(routerKey为"direct.queue1")收到消息

Direct交换机和Fanout交换机的差异:

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给那个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以*.*分割

Queue与Exchange指定的BindingKey时可以使用通配符

#:表示0个或多个单词

*:表示一个单词

实现:

1、设置交换机和队列
为队列2的routerKey设置通配符

@Configuration
public class TopicConfig {
    /**
     * 交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){

        return ExchangeBuilder.topicExchange("demo.topic")
                .durable(true)//配置持久化
                .ignoreDeclarationExceptions() //忽略声明时的异常
                .build();
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue topicQueue1(){
        return QueueBuilder.durable("topic.queue1")//创建持久化的队列
                .build();
    }
    @Bean
    public Queue topicQueue2(){
        return QueueBuilder.durable("topic.queue2")//创建持久化的队列
                .build();
    }

    /**
     * 绑定关系(注意bean的名称,Spring会直接从容器中找)
     */
    @Bean
    public Binding fanoutBind1(TopicExchange topicExchange, Queue topicQueue1){
        return BindingBuilder
                .bind(topicQueue1)//队列
                .to(topicExchange)
                .with("topic.queue1");//交换机
    }

    @Bean
    public Binding fanoutBind2(TopicExchange topicExchange, Queue topicQueue2){
        return BindingBuilder
                .bind(topicQueue2)//队列
                .to(topicExchange)//交换机
                .with("topic.*");//设置通配符,只要是topic.*都会发送到该队列
    }
}

2、消费者接收

    @RabbitListener(queues = "topic.queue1")
    public void consumer1(String msg){
        System.out.println("消费者1接收消息"+msg);
    }

    @RabbitListener(queues = "topic.queue2")
    public void consumer2(String msg){
        System.out.println("消费者2接收消息"+msg);
    }

3.消息发送

    @Test
    public void testFanout(){
        String msg="hello queue";
        rabbitTemplate.convertAndSend("demo.topic","topic.queue1",msg);
    }

结果:两个消费者都收到消息
在这里插入图片描述

交换机队列创建绑定方式2-注解

除了上述演示的申明bean设置队列和交换机
还可以通过@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

  • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
    • value:这个消费者关联的队列。值是@Queue,代表一个队列
    • exchange:队列所绑定的交换机,值是@Exchange类型
    • key:队列和交换机绑定的RoutingKey
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "stock.queue",durable = "true"),
                    exchange =@Exchange(value = "pay.exchange",type = "topic",durable = "true",ignoreDeclarationExceptions = "true"),
                    key = "pay.#"
            )
    )
    public void stockConsumer(String msg){
        System.out.println("库存服务获取到消息"+msg);

    }

生产者确认

为了确保生产者成功发送消息,RabbitTemplate提供了生产者确认回调,消息发送失败可以调用设置的回调方法进行处理,步骤如下:

1、 添加配置:
server:
  port: 8081
spring:
  rabbitmq:
    host: 192.168.242.66
    port: 5672
    virtual-host: /myhost
    username: admin
    password: admin
    publisher-returns: true
    publisher-confirm-type: correlated # SIMPLE-同步确认(阻塞) CORRELATED-异步确认

2、 创建ProducerAckConfig

内容如下:

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        // 确认消息是否到达交换机
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack){
                log.warn("消息没有到达交换机:" + cause);
            }
        });

        // 确认消息是否到达队列,到达队列该方法不执行
        this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.warn("消息没有到达队列,来自于交换机:{},路由键:{},消息内容:{}", exchange, routingKey, new String(message.getBody()));
        });
    }
}
3、 测试

测试1:消息正常发送,正常消费

在这里插入图片描述

测试2:消息到达交换机,没有达到队列(交换机存在,但是路由key和绑定的队列不一致)

 amqpTemplate.convertAndSend("demo.exchange","c.a.b" , "hehe");

在这里插入图片描述

在这里插入图片描述

测试3:消息不能到达交换机(交换机不存在)

 amqpTemplate.convertAndSend("demo.exchange2","a.b" , "hehe");

在这里插入图片描述

在这里插入图片描述

消费者确认

1、简介

消费者接受到消息使用时的确认机制:ack,默认消费者接受到消息后自动确认

springboot-rabbit提供了三种消息确认模式:

  • AcknowledgeMode.NONE:不确认模式(不管程序是否异常只要执行了监听方法,消息即被消费。相当于rabbitmq中的自动确认,这种方式不推荐使用)
  • AcknowledgeMode.AUTO:自动确认模式(默认,消费者没有异常会自动确认,有异常则不确认,无限重试,导致程序死循环。不要和rabbit中的自动确认混淆)
  • AcknowledgeMode.MANUAL:手动确认模式(需要手动调用channel.basicAck确认,可以捕获异常控制重试次数,甚至可以控制失败消息的处理方式)

全局配置方法:

# rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # manual-手动  auto-自动(无异常直接确认,有异常无限重试) none-不重试

或者

以下只针对设置的消费者

@RabbitListener(queues = "demo.queue",
                ackMode = "MANUAL" )// NONE,AUTO
2、 确认模式测试
2.1、AUTO-自动确认模式

消费者中制造一个异常:然后重启消费者服务

@RabbitListener(queues = "demo.queue")
public void consumer1(String msg){//String类型的形参表示获取到的队列中的消息
    System.out.println("获取到消息:"+ msg);
    int i =  1/0;
}

运行生产者测试代码发送消息到demo.exchange交换机测试:

@Test
void contextLoads() {
    amqpTemplate.convertAndSend("demo.exchange","a.b" , "hehe");
}

测试结果:

在这里插入图片描述

可以看到mq将无限重试,消费消息:(默认AUTO确认模式,消息消费有异常,设置消息重新归队到mq消息队列中,然后消费者监听器又可以重新消费消息)

消息将无法消费:

停掉应用消息回到Ready状态,消息不会丢失!

2.2、NONE-不确认模式

修改消费者确认模式为NONE:重启消费者服务

再次执行生产者测试代码:

在这里插入图片描述

所有的消息都被消费:

在这里插入图片描述

2.3、MANUAL-不确认模式

修改消费者确认模式为:MANUAL

再次执行生产者测试代码:

测试结果:消息接收到了但是队列中消息等待确认,如果停掉程序会重新进入ready状态

在这里插入图片描述

程序停止运行:

在这里插入图片描述

修改消费者代码:确认消息

3、手动ack
    @RabbitListener(queues = "demo.queue")
public  void consumer1(String msg, Channel channel , Message message) throws Exception {
    try {
        System.out.println("接收到消息:" + msg);
        int i = 1 / 0;
        // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息重试后依然失败,拒绝再次接收");
            // 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定