Spring Boot 集成RabbitMQ

时间:2022-06-01 18:47:46

此处假设已经安装好了RabbitMQ,主要讲述使用Spring Boot如何集成RabbitMQ。

添加依赖

在Maven的pom.xml添加rabbitmq的starter依赖,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sivalabs</groupId>
    <artifactId>springboot-rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RC1</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

</project>

RabbitMQ配置

RabbitMQ需要配置Queue,Exchange和Binding响应的Bean。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig  
{
    public static final String QUEUE_ORDERS = "orders-queue";
    public static final String EXCHANGE_ORDERS = "orders-exchange";

    @Bean
    Queue ordersQueue() {
        return QueueBuilder.durable(QUEUE_ORDERS).build();
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
    }

    @Bean
    Exchange ordersExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();
    }

    @Bean
    Binding binding(Queue ordersQueue, TopicExchange ordersExchange) {
        return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);
    }
}

例子里定义了名为orders-queue的队列和名为orders-exchange的Exchange,以及定义了orders-queue与orders-exchange的绑定,其中设定router key为orders-queue。任何以orders-queue为router key都会发送到队列orders-queue。

在application.properties配置RabbitMQ服务器信息:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

环境已经配置好,接着是应用。应用分为两部分:发送方OrderMessageSender 和接收方OrderMessageListener 。

消息发送OrderMessageSender

这里新建一个Bean OrderMessageSender作为发送方,它将用来发送消息到orders-exchange。

模型Order

public class Order implements Serializable {
    private String orderNumber;
    private String productId;
    private double amount;

    //setters & getters
}

用于传递数据的数据模型需要序列化。

RabbitTemplate 

OrderMessageSender主要是使用RabbitTemplate来发送消息。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderMessageSender {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendOrder(Order order) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);
    }
}

调用rabbitTemplate.convertAndSend()发送消息,它接收两个参数:

  • routerKey:第一个参数路由的键,此router key为配置里定义绑定时设置的值。
  • data:第二个参数为发送消息载体

默认情况下,Spring Boot使用org.springframework.amqp.support.converter.SimpleMessageConverter来序列化数据为字节码。

监听消息OrderMessageListener

接着是在消息消费端实现消息的监听,以便接收消息。这里订单消息监听类为OrderMessageListener 。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderMessageListener {

    static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);

    @RabbitListener(queues = RabbitConfig.QUEUE_ORDERS)
    public void processOrder(Order order) {
        logger.info("Order Received: "+order);
    }
}

在相应的处理方法上添加注解@RabbitListener,并指定监听的队列。

至此就完成了一个简单消息的发送和接收。