RabbitMQ
异步通信
异步通信的优点
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
异步通信的缺点
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
RabbitMQ安装
RabbitMQ中的几个概念:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息的队列
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
默认15672端口是
RabbitMQ模型
1.基本消息队列
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.239.131");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.239.131");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()5.利用channel将消费者与队列绑定
SpringAMQP
1.什么是SpringAMQP
-
什么是AMQP?
应用间消息通信的一种协议,与语言和平台无关。AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
-
springAMQP如何发送消息?
- 引入amqp的starter依赖
- 配置RabbitMQ地址
- 利用RabbitTemplate的convertAndSend方法
2.用SpringAMQP简化基本消息队列
在父工程中引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.springAMQP发送消息
1. 配置RabbitMQ地址
spring:
rabbitmq:
host: 192.168.239.131
port: 5672
username: itcast
password: 123321
virtual-host: /
2. 利用RabbitTemplate的convertAndSend方法
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend2SimpleQueue(){
String queueName = "simple.queue";
String message = "hello!,SpringAMQP!";
rabbitTemplate.convertAndSend(queueName,message);
}
}
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列。
2.springAMQP接收消息
在consumer服务中编写消费逻辑,绑定simple.queue这个队列
1. 配置RabbitMQ地址
2.写一个监听类监听消息队列
@Component
public class RabbitMQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到了【" + msg + "】");
}
}
3.工作队列
生产者生产50个消息:
@Test
public void testSend2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello!,message__!";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message+i);
Thread.sleep(20);
}
}
模拟两个消费能力不同的消费者:一个1s消费50个,一个1s消费10个。让这两个消费者绑定到同一个队列:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者接收到了[" + msg + "]"+LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者接收到了[" + msg + "]"+ LocalDateTime.now());
Thread.sleep(200);
}
work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
4.发布订阅队列模型-fanoutExchange
1.在consumer服务声明Exchange、Queue、Binding
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("kxy.fanoutExchange");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("kxy.fanoutQueue1");
}
@Bean
public Queue fanoutQueue2(){
return new Queue("kxy.fanoutQueue2");
}
@Bean
public Binding bindingQueue1(FanoutExchange fanoutExchange,Queue fanoutQueue1){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.在consumer服务声明两个消费者
@Component
public class RabbitMQListener {
@RabbitListener(queues = "kxy.fanoutQueue1")
public void listenFanoutQueue1(String msg) {
System.err.println("消费者1接收到了[" + msg + "]");
}
@RabbitListener(queues = "kxy.fanoutQueue2")
public void listenFanoutQueue2(String msg) {
System.err.println("消费者2接收到了[" + msg + "]");
}
}
3.在publisher服务发送消息到FanoutExchange
@Test
public void testSend2fanoutQueue(){
String exchangeName = "kxy.fanoutExchange";
String message = "hello!,everyone!";
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
总结:
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列·不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Binding
5.发布订阅队列模型–DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式( routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
1.在消费者服务里声明两个队列,以及队列的key
@Component
public class RabbitMQListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("kxy.directQueue1"),
exchange = @Exchange(name = "kxy.directExchange",type = ExchangeTypes.DIRECT),
key = {"blue","red"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到了[" + msg + "]");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("kxy.directQueue2"),
exchange = @Exchange(name = "kxy.directExchange",type = ExchangeTypes.DIRECT),
key = {"yellow","red"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到了[" + msg + "]");
}
}
2.生产者发布消息到交换机,并指定相应的key,到不同的路由
public void testSend2DirectQueue(){
String exchangeName = "kxy.directExchange";
String message = "hello!,blue!";
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
- @Queue
- @Exchange
6.发布订阅队列模型-TopicExchange
#代表多个或0个
*代表恰好一个
1.在消费者服务声明TopicExchange,以及绑定的两个队列,队列的key
@RabbitListener(bindings = @QueueBinding(
value = @Queue("kxy.topicQueue1"),
exchange = @Exchange(name = "kxy.topicExchange",type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到了[" + msg + "]");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("kxy.topicQueue2"),
exchange = @Exchange(name = "kxy.topicExchange",type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到了[" + msg + "]");
}
2.生产者服务发布消息,可以通过通配符来决定发送给哪个消费者:
@Test
public void testSend2TopicQueue(){
String exchangeName = "kxy.topicExchange";
String message = "哈哈哈哈哈哈";
rabbitTemplate.convertAndSend(exchangeName,"china.weather",message);
}
7.消息转换器
需要用到Jackson来做json的序列化:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
SpringAMQP中消息的序列化和反序列化是怎么实现的?
- 利用MessageConverter实现的,默认是JDK的序列化
- 注意发送方与接收方必须使用相同的MessageConverter