什么是MQ?
MQ(Message Queue):翻译为消息队列
,通过典型的生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断地从队列中获取消息.因为消息的生产和消费都是异步的,而且只关心消息的发送和接受,没有业务逻辑的入侵,轻松的实现系统间解耦.别名为消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成
MQ种类
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ
,RabbitMQ
,Kafka
,RocketMQ
等
不同MQ的特点
# 1 ActiveMQ
ActiveMQ是Apache下的一个子项目。使用Java完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,少量代码就可以高效地实现高级应用场景。
丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎
# 2 Kafka
也是Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统,
一开始的目的就是用于日志收集和传输.0.8版本开始支持复制,不支持事务,对消息的重复,丢失,错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务
# 3 RocketMQ
阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。
同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,
在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。
# 4 RabbitMQ
使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。
同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。
对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用
MQ的应用场景
异步处理
`场景说明:用户注册后,需要发送注册邮件和注册短信,传统的做法有两种 1. 串行 2. 并行`
-
串行方式:
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端.这有一个问题,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要的东西
-
并行方式:
将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间
-
消息队列:
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用事件100ms.虽然并行已经提高了处理时间,但是,前面说过,邮件和短信对我们正常的使用网站并没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.引入消息队列后,把发送邮件,短信等不是必须的业务逻辑异步处理
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间,引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍
应用解耦
`场景:双11的时候,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口`
这种做法有一个缺点
当库存系统出现故障时,订单就会失败,订单系统和库存系统高耦合,引入消息队列后:
-
订单系统:
用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 -
库存系统:
订阅下单的消息,获取下单消息,进行库操作.就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失
流量削峰
`场景:`秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列
`作用:`
1. 可以控制活动人数,超过此一定阈值的订单直接丢弃
2. 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单
RabbitMQ
基于
AMQP
协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一.
AMQP协议介绍
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
Broker
: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。-
Virtual host
: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。类似于数据库中库的概念,一个应用可以分配一个Virtual Host
Connection
: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。Channel
: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。Exchange
: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。Queue
: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。Binding
: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
Linux下安装rabbitMQ
安装erlang
- Erlang在默认的YUM存储库中不可用,因此您将需要安装EPEL存储库。 运行以下命令相同。
yum -y install epel-release
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
yum install erlang
erl -version -- 查看版本
安装rabbitmq
访问https://www.rabbitmq.com/#getstarted,找到这个页面
选择自己需要的版本,我这里是centos7
可以通过:wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.12/rabbitmq-server-3.8.12-1.el7.noarch.rpm
在linux中把rpm包下载下来,
之后通过rpm -Uvh rabbitmq-server-3.8.12-1.el7.noarch.rpm
来安装
- 启动rabbitMQ
systemctl start rabbitmq-server
- 引导时自动启动rabbitmq,可以通过命令:
systemctl enable rabbitmq-server
- 查看rabbitmq服务器的状态
systemctl status rabbitmq-server
# 3 记得关闭防火墙
systemctl stop firewalld
- 阿里云服务器记得开放15672端口
# 4 访问web控制台
- 启动RabbitMQ Web管理控制台,方法是运行:
rabbitmq-plugins enable rabbitmq_management
- tips: 可以通过rabbitmq-plugins list查看有哪些plugins可以使用
- 通过运行以下命令,将RabbitMQ文件的所有权提供给RabbitMQ用户:
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/
- 现在,您将需要为RabbitMQ Web管理控制台创建管理用户。 运行以下命令相同。
rabbitmqctl add_user admin StrongPassword
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin “.*” “.*” “.*”
将管理员更改为管理员用户的首选用户名。 确保将StrongPassword更改为非常强大的密码。
要访问RabbitMQ的管理面板,请使用您最喜爱的Web浏览器并打开以下URL。
http://Your_Server_IP:15672/
# 5 rabbitmq的卸载
卸载前先停止rabbitmq服务
systemctl stop rabbitmq-server
- 查看rabbitmq安装的相关列表
yum list | grep rabbitmq
- 卸载rabbitmq已安装的相关内容
yum -y remove rabbitmq-server.noarch
- 删除相关文件
rm -rf /var/lib/rabbitmq
rm -rf /usr/local/rabbitmq
输入用户名和密码后就会看到如下画面
rabbitmq的基本使用
- 首先得先创建一个 Virtual Host
- 点进去新创建的Virtual Host 可以设置用户权限
- 接着在maven工程中引入rabbitmq依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
- 写代码来测试
第一种模型(直连)
"P"是我们的生产者,"C"是我们的消费者。中间的框是一个队列,可以缓存消息 - RabbitMQ 代表使用者保留的消息缓冲区。
生产者测试
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: 生产者初步使用
* @USER: 罗龙达
* @DATE: 2021/2/15 23:43
*/
public class Provider {
@Test
public void sendMessage() throws IOException, TimeoutException {
//创建连接mq的工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("39.99.236.216");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道对象
Channel channel = connection.createChannel();
//通道绑定对应消息队列(通道中传的就是消息队列的消息)
/**
* 参数1:队列名称 如果队列不存在则自动创建
* 参数2:是否持久化 如果为false 即不持久化 在重启rabbitmq的时候队列会消失
* 如果为true 即使队列持久化了 消息依然会消失
* 想要消息也持久化得在发布消息的参数3设置:
* MessageProperties.PERSISTENT_TEXT_PLAIN
* 参数3:是否独占队列
* 参数4:是否在消费完成后自动删除队列
* 参数5:附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
//发布消息
/**
* 参数1:交换机名称
* 参数2:队列名称
* 参数3:传递消息额外设置
* 参数4:消息的具体内容
*/
channel.basicPublish("","hello",null,"helloWorld".getBytes());
//关闭通道
channel.close();
connection.close();
}
}
注意:用云服务器的话一定得把5672端口开放,防火墙关闭
- 测试
消费者测试
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: 消费者初步使用
* @USER: 罗龙达
* @DATE: 2021/2/16 0:12
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("39.99.236.216");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//创建连接对象
Connection connection = connectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定对象
channel.queueDeclare("hello",false,false,false,null);
//消费消息
/**
* 参数1:消费哪个队列的消息 队列名称
* 参数2:开始消息的自动确认机制
* 参数3:消费消息时的回调接口
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
/**
*
* @param consumerTag
* @param envelope
* @param properties
* @param body 消息队列中取出的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
//Consumer中要一直监听队列中的消息,因此不建议关闭
// channel.close();
// connection.close();
}
}
当我们向队列中放入一个消息时,消费者就会立马取出
创建一个工具类封装创建连接和关闭连接的方法
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: 封装创建连接和关闭连接的方法
* @USER: 罗龙达
* @DATE: 2021/2/16 0:37
*/
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
//重量级资源放到静态代码块中给执行,这样执行的时候只会在程序中new一次
static {
//创建连接mq的工厂
connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("39.99.236.216");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
}
//定义提供连接对象的方法
public static Connection getConnection() {
try {
//返回连接对象
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void closeConnAndChannel(Channel channel, Connection conn) {
try {
if (channel != null) channel.close();
if (conn != null) conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 改写provider
public class Provider {
@Test
public void sendMessage() throws IOException, TimeoutException {
//通过工具类获取连接
Connection connection = RabbitMQUtils.getConnection();
//获取连接中的通道对象
Channel channel = connection.createChannel();
//通道绑定对应消息队列(通道中传的就是消息队列的消息)
/**
* 参数1:队列名称 如果队列不存在则自动创建
* 参数2:是否持久化
* 参数3:是否独占队列
* 参数4:是否在消费完成后自动删除队列
* 参数5:附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
//发布消息
/**
* 参数1:交换机名称
* 参数2:队列名称
* 参数3:传递消息额外设置
* 参数4:消息的具体内容
*/
channel.basicPublish("","hello",null,"helloWorld".getBytes());
//关闭通道
// channel.close();
// connection.close();
//通过工具类关闭通道
RabbitMQUtils.closeConnAndChannel(channel,connection);
}
}
- 改写consumer
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//通过工具类获取连接
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定对象
channel.queueDeclare("hello",false,false,false,null);
//消费消息
/**
* 参数1:消费哪个队列的消息 队列名称
* 参数2:开始消息的自动确认机制
* 参数3:消费消息时的回调接口
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
/**
*
* @param consumerTag
* @param envelope
* @param properties
* @param body 消息队列中取出的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
//Consumer中要一直监听队列中的消息,因此不建议关闭
// channel.close();
// connection.close();
}
}
第二种模型(工作队列)
work queues
,也被称为(Task queues
),任务模型.当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度.长此以往,消息就会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的.
- provider测试
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: work queues 模型 生产者
* @USER: 罗龙达
* @DATE: 2021/2/16 18:39
*/
public class Provider {
public static void main(String[] args) throws IOException {
//获取链接对象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 20; i++) {
//生产消息
channel.basicPublish("","work", null,(i+ "workQueue Test").getBytes());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
RabbitMQUtils.closeConnAndChannel(channel,connection);
}
}
- consumer-1测试
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: work queues 模型 消费者1
* @USER: 罗龙达
* @DATE: 2021/2/16 18:46
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-1 = " + new String(body));
}
});
}
}
- consumer-2测试
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/16 18:50
*/
public class consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-2 = " + new String(body));
}
});
}
}
启动consumer-1和consumer-2监听5672端口,当provider发送20条消息时的结果:
consumer-1收到的消息:
consumer-1 = 1workQueue Test
consumer-1 = 3workQueue Test
consumer-1 = 5workQueue Test
consumer-1 = 7workQueue Test
consumer-1 = 9workQueue Test
consumer-1 = 11workQueue Test
consumer-1 = 13workQueue Test
consumer-1 = 15workQueue Test
consumer-1 = 17workQueue Test
consumer-1 = 19workQueue Test
consumer-2收到的消息:
consumer-2 = 0workQueue Test
consumer-2 = 2workQueue Test
consumer-2 = 4workQueue Test
consumer-2 = 6workQueue Test
consumer-2 = 8workQueue Test
consumer-2 = 10workQueue Test
consumer-2 = 12workQueue Test
consumer-2 = 14workQueue Test
consumer-2 = 16workQueue Test
consumer-2 = 18workQueue Test
总结:
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者.平均而言,每个消费者都会受到相同数量的消息.这种分法消息的方式称为循环
官方文档:
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
消息确认机制
官方文档中描述如下:
执行任务可能需要几秒钟。你可能想知道,如果其中一个消费者开始一个漫长的任务,并死于它只有部分完成会发生什么。使用我们当前的代码,一旦 RabbitMQ 向使用者传递消息,它立即标记它用于删除。在这种情况下,如果你杀死一个工作人员,我们将失去消息,它只是处理。我们还将失去发送到此特定工作人员但尚未处理的所有消息。
但我们不想失去任何任务。如果一个工人死了,我们希望任务被送到另一个工人那里继续执行。
为了确保消息永远不会丢失,RabbitMQ 支持消息确认。消费者会发送一个确认信,告诉 RabbitMQ 已收到、处理过特定消息,并且 RabbitMQ 可以*删除它。
如果使用者死亡(其通道关闭,连接关闭,或 TCP 连接丢失),而不发送 ack,RabbitMQ 将了解消息没有完全处理,并将重新排队。如果同时有其他在线消费者,它会迅速重新递送给其他消费者。这样,您就可以确保不会丢失任何消息,即使工人偶尔会死亡。
没有任何消息超时;RabbitMQ 将在消费者死亡时重新传递消息。即使处理消息需要很长时间也很好.
默认情况下,手动消息确认处于打开状态。在前面的示例中,我们通过autoAck= true
标志显式关闭它们。是时候将此标志设置为false,并在完成任务后从工作人员发送适当的确认。
能者多劳模式设置
- consumer-1改造
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: work queues 模型 消费者1
* @USER: 罗龙达
* @DATE: 2021/2/16 18:46
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
//设置每一次只能消费一个消息
channel.basicQos(1);
/**
* 参数1:消费哪个队列的消息 队列名称
* 参数2:开始消息的自动确认机制 关闭消息自动确认机制
* 参数3:消费消息时的回调接口
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-1 = " + new String(body));
/**
* 开启手动确认
* 参数1:手动确认消息标识
* 参数2:是否开启多个消息同事确认 false 每次确认一个
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
- consumer-2改造
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/16 18:50
*/
public class consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
//设置每一次只能消费一个消息
channel.basicQos(1);
/**
* 参数1:消费哪个队列的消息 队列名称
* 参数2:开始消息的自动确认机制 关闭消息自动确认机制
* 参数3:消费消息时的回调接口
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//为了使得消费者-2运行的比消费者-1慢,睡他个2s
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer-2 = " + new String(body));
/**
* 手动确认
* 参数1:手动确认消息标识
* 参数2:是否开启多个消息同事确认 false 每次确认一个
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
测试
consumer-1收到的消息:
consumer-1 = 1workQueue Test
consumer-1 = 0workQueue Test
consumer-1 = 2workQueue Test
consumer-1 = 3workQueue Test
consumer-1 = 4workQueue Test
consumer-1 = 5workQueue Test
consumer-1 = 6workQueue Test
consumer-1 = 7workQueue Test
consumer-1 = 8workQueue Test
consumer-1 = 9workQueue Test
consumer-1 = 10workQueue Test
consumer-1 = 11workQueue Test
consumer-1 = 12workQueue Test
consumer-1 = 13workQueue Test
consumer-1 = 14workQueue Test
consumer-1 = 15workQueue Test
consumer-1 = 16workQueue Test
consumer-1 = 17workQueue Test
consumer-1 = 19workQueue Test
consumer-2收到的消息:
consumer-2 = 1workQueue Test
consumer-2 = 18workQueue Test
第三种模型(广播fanout)
Putting it all together
在广播模式下,消息发送流程是这样的:
可以有多个消费者
每个消费者有自己的queue(队列)
每个队列都要绑定到Exchange(交换机)
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息,实现一条消息被多个消费者消费
provider开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: 广播模型 生产者
* @USER: 罗龙达
* @DATE: 2021/2/16 23:36
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 声明交换机
* 参数1: 交换机名称
* 参数2: 交换机类型 fanout -- 广播类型
*/
channel.exchangeDeclare("logs","fanout");
//发送消息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
RabbitMQUtils.closeConnAndChannel(channel,connection);
}
}
- consumer-1开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/16 23:42
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs","");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-1 = " + new String(body));
}
});
}
}
- consumer-2开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/16 23:42
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs","");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-2 = " + new String(body));
}
});
}
}
- consumer-3开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/16 23:42
*/
public class Consumer3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs","");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-3 = " + new String(body));
}
});
}
}
同时启动不同的consumer后,当provider发布消息后,三个consumer能同时接到消息
第四种模型(Routing路由)
Routing之订阅模型-Direct(直连)
在Fanout模式中,一条消息,会被所有订阅的队列都消费.但是,在某些场景下,我们希望不同的消息被不同的队列消费.这时就要用到Direct类型的Exchange.
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
- 消息的发送方在向Exchange发送消息时,也必须指定消息的
RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing key
进行判断,只有队列的RoutingKey
与消息的Routing key
完全一致,才会接受到消息
例如队列Q1与orange
绑定,而Q2与black
和green
绑定
只有当provider发送的消息的key为orange
时消息才会发送到Q1队列
只有当provider发送的消息的key为black
或者green
时消息才会发送到Q2队列
其他情况消息都会被丢弃
- provider开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/17 0:16
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 通过通道声明交换机
* 参数1: 交换机名称
* 参数2: 设置交换机模式 direct -- 路由模式
*/
channel.exchangeDeclare("logs_direct","direct");
//发送消息
String routingKey = "info";
channel.basicPublish("logs_direct",routingKey,
null,("这是direct模型发布的基于routing key: [" + routingKey + "]").getBytes());
RabbitMQUtils.closeConnAndChannel(channel,connection);
}
}
- consumer--1开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/17 0:22
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 通过通道声明交换机
* 参数1: 交换机名称
* 参数2: 设置交换机模式 direct -- 路由模式
*/
channel.exchangeDeclare("logs_direct","direct");
//创建一个临时队列
String queue = channel.queueDeclare().getQueue();
/**
* 绑定队列和交换机
* 参数1: 队列名称
* 参数2: 交换机名称
* 参数3: routing key的名称
*/
channel.queueBind(queue,"logs_direct","error");
//获取消费的消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-1 = " + new String(body));
}
});
}
}
- consumer-2开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/17 0:27
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
/**
* 通过通道声明交换机
* 参数1: 交换机名称
* 参数2: 设置交换机模式 direct -- 路由模式
*/
channel.exchangeDeclare("logs_direct","direct");
//创建一个临时队列
String queue = channel.queueDeclare().getQueue();
//基于route key绑定队列和交换机
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","warning");
//获取消费的消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-2 = " + new String(body));
}
});
}
}
同时启动不同的consumer后,当provider发布消息后,两个consumer中只有绑定了routing key=info
的Consumer2能接收到消息
Routing之订阅模型
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列.只不过topic
类型的Exchange
可以让队列在绑定Routing key
的时候使用通配符!这种模型Routing key
一般都是由一个或多个单词组成,多个单词之间以.
分割,例如:item.insert
通配符
*
(star) can substitute for exactly one word. -- 匹配不多不少恰好一个单词#
(hash) can substitute for zero or more words. -- 匹配一个或多个词
举栗子
以上图的队列举例:
如果一条信息的routing key为 quick.orange.rabbit
,那么它两个队列都可以去
- 因为分别匹配
*.orange.*
和*.*.rabbit
如果一条信息的routing key为 lazy.orange.elephant
,那么它两个队列都可以去
- 因为分别匹配
*.orange.*
和lazy.#
但是如果一条信息的routing key为 quick.orange.fox
,那么它只能去队列1
- 与
*.orange.*
匹配,与队列2中没有匹配的
如果一条信息的routing key为 lazy.brown.fox
,那么它只能去队列2
- 与
lazy.#
匹配,与队列1中没有匹配的
如果一条信息的routing key为
orange
或者quick.orange.male.rabbit
,它将不能匹配任何一个队列,消息将会丢失
- provider开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/17 1:19
*/
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机机以及交换机类型 -- topic
channel.exchangeDeclare("topics","topic");
//发布消息
String routeKey = "service.user.black";
channel.basicPublish("topics",routeKey,null,("这里是topic动态路由模型,routekey:[" + routeKey + "]").getBytes());
RabbitMQUtils.closeConnAndChannel(channel,connection);
}
- consumer-1开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/17 1:24
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机机以及交换机类型 -- topic
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机 动态通配符形式route key
channel.queueBind(queue,"topics","*.user.*");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-1 = " + new String(body));
}
});
}
}
- consumer-2开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION:
* @USER: 罗龙达
* @DATE: 2021/2/17 1:28
*/
public class consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机机以及交换机类型 -- topic
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定队列和交换机 动态通配符形式route key
channel.queueBind(queue,"topics","user.#");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-2 = " + new String(body));
}
});
}
}
同时启动不同的consumer后,当provider发布消息后,两个consumer中只有通配符为*.user.*
的Consumer-1能接收到消息,因为consumer-2不符合规则
SpringBoot中使用RabbitMQ
- 引入依赖
<!--rabbitmq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置配置文件
#rabbitmq的配置
spring.rabbitmq.host=39.99.236.216
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/ems
RabbitTemplate
用来简化操作 使用的时候直接在项目中注入即可使用
HelloWorld模型测试
- consumer开发
/**
* @PROJECT_NAME: myTest
* @DESCRIPTION: 消费者
* @USER: 罗龙达
* @DATE: 2021/2/17 1:54
*/
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloConsumer {
@RabbitHandler
public void receive(String message){
System.out.println("message = " + message);
}
}
- provider开发
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
class RabbitmqSpringbootApplicationTests {
//注入rabbitmqTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testHello() {
rabbitTemplate.convertAndSend("hello","helloWorld");
}
}
work模型测试
- provider开发
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
class RabbitmqSpringbootApplicationTests {
//注入rabbitmqTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","workTest");
}
}
}
- consumer开发
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
说明:默认在spring AMQP实现中work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
fanout模型测试
- provider开发
//注入rabbitmqTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("logs","","Fanout模型发送消息");
}
- consumer开发
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "logs",type = "fanout") //绑定交换机
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "logs",type = "fanout") //绑定交换机
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
route模型测试
- provider开发
//注入rabbitmqTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testRoute(){
rabbitTemplate.convertAndSend("directs","info","发送info的key的路由消息");
}
- consumer开发
@Component
public class DirectConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "directs",type = "direct"), //绑定交换机
key = {"info","warning","error"} //指定路由的key
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(value = "directs",type = "direct"), //绑定交换机
key = {"error"} //指定路由的key
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
Topic模型测试
- provider开发
//注入rabbitmqTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","delete.order","基于delete.order的路由消息");
}
- consumer开发
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topics"),
key = {"order.#","user.*","user.save"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topics"),
key = {"order.#","*.user.*","#.order.#"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}