前言
MQ,即消息队列Message Queue的缩写。
RabbitMQ 是MQ的一种,就像招商银行是银行的一种一样。主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC的调用等等。
安装
springboot集成
简单的(一个生产者,一个消费者,使用TopicExchange)
MAVEN依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
application.properties
根据自己的配置调整
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
配置Queue,Exchange及其绑定关系
@Configuration
public class RabbitConfig {
@Bean
public Queue commonQueue() {
return new Queue("commonQueue");
}
@Bean
public TopicExchange commonExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding commonBinding() {
return BindingBuilder.bind(commonQueue()).to(commonExchange()).with("topic.unique.key");
}
}
这里我只配了一个队列,并将其绑定到了指定的TopicExchange上,用于生产者和消费者建立联系。
生产者(发送消息)
springboot已经为我们封装好了Rabbit模板类RabbitTemplate,它可以用来发送和接收消息,直接注入即可使用。
这里我们发送一条日期字符串到服务器。
@Component
public class Producer {
private final RabbitTemplate rabbitTemplate;
@Autowired
public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send() {
String msg = new Date().toString();
rabbitTemplate.convertAndSend("topicExchange", "topic.unique.key", msg);
LoggerFactory.getLogger(this.getClass()).info("消息{}已发送!", msg);
}
}
注意:send()方法是自定义的,用来供外部调用,可以带参数。
如果有外部程序调用了生成者的send方法,根据exchange和queue可以在服务器上直接查看。
测试类:
@SpringBootTest
@RunWith(SpringRunner.class)
public class SimpleTest {
@Autowired
private Producer producer;
@Test
public void testSimple() {
producer.send();
}
}
服务器效果:
说明我们的消息发送成功了,那么如何获取并处理呢?
消费者
在springboot接收消息非常容易,只需要在方法上添加注解@RabbitListener
并通过queues
参数绑定监听的队列即可,由于上例中我们只有一个队列commonQueue
,那么我们绑定它并把接收到的消息打印出来:
@Component
public class Consumer extends RabbitProperties.Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
private static final String COMMON_QUEUE = "commonQueue";
@RabbitListener(queues = COMMON_QUEUE)
public void dealMessageOfCommonQueue(String msg) {
LOGGER.info("接收到来自队列{}的消息:{}", COMMON_QUEUE, msg);
}
}
启动服务看看会不会从服务器中拿到刚才发送的字符串:
看到打印出来了消息说明成功消费了它,此时如果再去服务器查看,会发现消息为空,因为它已经被消费掉了。
注意:
- 处理消息的方法中,传入了一个String类型参数,是因为很清楚生产者发送的消息是字符串类型。也可以是其他类型,对的上就行。
- 用Object类型接收的话,会传入除了发送者发送的消息外还有一些配置信息。故不建议使用Object来入参。
- 此处为了方便演示,将生产者和消费者写在一个服务中了,它们可以在不同的(微)服务中,此处应该想到前言中的
解耦
二字。