此处假设已经安装好了RabbitMQ,主要讲述使用Spring Boot如何集成RabbitMQ。
添加依赖
在Maven的pom.xml添加rabbitmq的starter依赖,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sivalabs</groupId>
<artifactId>springboot-rabbitmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RC1</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
RabbitMQ配置
RabbitMQ需要配置Queue,Exchange和Binding响应的Bean。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig
{
public static final String QUEUE_ORDERS = "orders-queue";
public static final String EXCHANGE_ORDERS = "orders-exchange";
@Bean
Queue ordersQueue() {
return QueueBuilder.durable(QUEUE_ORDERS).build();
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
}
@Bean
Exchange ordersExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();
}
@Bean
Binding binding(Queue ordersQueue, TopicExchange ordersExchange) {
return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);
}
}
例子里定义了名为orders-queue的队列和名为orders-exchange的Exchange,以及定义了orders-queue与orders-exchange的绑定,其中设定router key为orders-queue。任何以orders-queue为router key都会发送到队列orders-queue。
在application.properties配置RabbitMQ服务器信息:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
环境已经配置好,接着是应用。应用分为两部分:发送方OrderMessageSender 和接收方OrderMessageListener 。
消息发送OrderMessageSender
这里新建一个Bean OrderMessageSender作为发送方,它将用来发送消息到orders-exchange。
模型Order
public class Order implements Serializable {
private String orderNumber;
private String productId;
private double amount;
//setters & getters
}
用于传递数据的数据模型需要序列化。
RabbitTemplate
OrderMessageSender主要是使用RabbitTemplate来发送消息。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderMessageSender {
private final RabbitTemplate rabbitTemplate;
@Autowired
public OrderMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendOrder(Order order) {
this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);
}
}
调用rabbitTemplate.convertAndSend()发送消息,它接收两个参数:
- routerKey:第一个参数路由的键,此router key为配置里定义绑定时设置的值。
- data:第二个参数为发送消息载体
默认情况下,Spring Boot使用org.springframework.amqp.support.converter.SimpleMessageConverter来序列化数据为字节码。
监听消息OrderMessageListener
接着是在消息消费端实现消息的监听,以便接收消息。这里订单消息监听类为OrderMessageListener 。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class OrderMessageListener {
static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);
@RabbitListener(queues = RabbitConfig.QUEUE_ORDERS)
public void processOrder(Order order) {
logger.info("Order Received: "+order);
}
}
在相应的处理方法上添加注解@RabbitListener,并指定监听的队列。
至此就完成了一个简单消息的发送和接收。