1 RabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,官网地址:http://www.rabbitmq.com。RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。本系列文章将系统介绍RabbitMQ的工作机制,代码驱动和集群配置,本篇主要介绍RabbitMQ中一些基本概念,常用的RabbitMQ Control命令,最后写一个C#驱动的简单栗子。先看一下RabbitMQ的基本结构:
上图是RabbitMQ的一个基本结构,生产者Producer和消费者Consumer都是RabbitMQ的客户端,Producer负责发送消息,Consumer负责消费消息。
接下来我们结合这张图来理解RabbitMQ的一些概念:
Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。
Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。
Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。
Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。
Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。
2 RabbitMQ安装
因为RabbitMQ是用erlang语言开发的,所以我们在安装RabbitMQ前必须要安装erlang支持。
1 Windows平台安装
1 安装erlang
首先下载erlang,直接下载最新版本,当前下载的是 OTP 21.3 Windows -bit Binary File ,下载完成后一直下一步安装即可。
2 安装RabbitMQ
下载Windows平台的RabbtMQ,当前下载的是 rabbitmq-server-3.7..exe ,下载完成后一直下一步安装即可。
3 安装Web管理插件
打开RabbitMQ Command Prompt,执行命令 rabbitmq-plugins enable rabbitmq_management 即可完成Web监控插件的安装。
安装完成后,打开浏览器输入 http://127.0.0.1:15672/ 使用默认账号[ name:guest / password:guest ]登录后界面如下,使用这个UI插件我们可以轻松的查看RabbitMQ中的交换机(exchange),队列(queue)等内容,也可以对exchange,queue,用户等进行添加、修改、删除操作。
到这一步Windows平台安装RabbitMQ完成了。 打开服务管理器,RabbitMQ已经在正常运行了,如下:
2 Centos安装RabbitMQ
1 安装RabbitMQ
这里虚拟机系统为Centos7,采用的安装方式是yum安装,为了简单,这里直接使用官方提供的erlang和RabbitMQ-server的自动安装脚本(官方安装文档),逐行执行下边的代码就可以安装完成erlang和RabbitMQ。
#安装socat
yum install socat #安装erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang #安装rabbitmq-server
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum -y install rabbitmq-server #启动rabbitmq服务
systemctl start rabbitmq-server
#添加web管理插件
rabbitmq-plugins enable rabbitmq_management
补充:如果安装完成后,执行RabbitMQ执行命令特别慢,或者出现报错【rabbitmq unable to perform an operation on node xxx@xxx】,解决方法:
编辑hosts,执行命令 vim /etc/hosts ,添加本机IP(或者虚拟机IP)
命令执行结束后,使用浏览器访问 http://127.0.0.1:15672/ 也会出现web管理界面。通过上边的安装步骤安装的RabbitMQ会生成Unit文件,所以我们可以使用Systemd管理RabbitMQ服务,以下是几条常用的命令:
#启动RabbitMQ服务
systemctl start rabbitmq-server
#停止RabbitMQ服务
systemctl stop rabbitmq-server
#查看RabbitMQ运行状态
systemctl status rabbitmq-server
#重启RabbitMQ服务
systemctl restart rabbitmq-server
2 RabbitMQ Control工具
使用Web管理界面可以实现RabbitMQ的大部分常用功能,但是有些功能WebUI是做不到的,如:开启/关闭RabbitMQ应用程序和集群的管理等。RabbitMQ Control是RabbitMQ的命令行管理工具,可以调用所有的RabbitMQ内置功能,主命令是rabbitmqctl ,下边是一个查询用户列表的命令,注意需要切换到sbin目录下执行:
为了方便的使用RabbitMQ Control工具,我们最好添加一个环境变量,Windows默认安装时在PATH中添加一条: C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.\sbin ,不是默认安装的话找到对应的安装目录添加PATH。按照上的安装方法,Centos可以直接使用RabbitMQ Control工具,不需要多余的配置。如果想详细了解RabbitMQ Control工具,可以参考RabbitMQ Control的官方文档。
这里总结了一些最常用到的RabbitMQ Controll命令,有兴趣的小伙伴可以试着运行一下这些命令,如在命令行工具中使用命令 rabbitmqctl add_user <username> <password> 添加一个新用户。
1 基本控制命令
基本控制命令主要用于启动、停止应用程序、runtime等
#停止rabbitmq和runtime
rabbitmqctl shutdown
#停止erlang节点
rabbitmqctl stop
#启用rabbitmq
rabbitmqctl start_app
#停止rabbitmq
rabbitmqctl stop_app
#查看状态
rabbitmqctl status
#查看环境
rabbitmqctl environment
#rabbitmq恢复最初状态,内部的exchange和queue都清除
rabbitmqctl reset
2 服务状态管理
这些命令主要用于用于查看exchang、channel、binding、queue、consumers:
#返回queue的信息
list_queues [-p <vhostpath>] [<queueinfoitem> ...]
#返回exchange的信息
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
#返回绑定信息
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
#返回链接信息
list_connections [<connectioninfoitem> ...]
#返回目前所有的channels
list_channels [<channelinfoitem> ...]
#返回consumers
list_consumers [-p <vhostpath>]
3 用户管理命令
这些命令主要用于添加、修改、删除用户及管理用户权限
#在rabbitmq的内部数据库添加用户
add_user <username> <password>
#删除一个用户
delete_user <username>
#改变用户密码
change_password <username> <newpassword>
#清除用户密码,禁止用户登录
clear_password <username>
#设置用户tags,就是设置用户角色
set_user_tags <username> <tag>
# 查看用户列表
list_users
#创建一个vhost
add_vhost <vhostpath>
#删除一个vhosts
delete_vhost <vhostpath>
#列出vhosts
list_vhosts [<vhostinfoitem> ...]
#针对一个vhosts 给用户赋予相关权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
#清除一个用户对vhost的权限
clear_permissions [-p <vhostpath>] <username>
#列出所有用户对某一vhost的权限
list_permissions [-p <vhostpath>]
#列出某用户的访问权限
list_user_permissions <username>
4 集群管理命令
#clusternode表示node名称,--ram表示node以ram node加入集群中。默认node以disc node加入集群,在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app。
join_cluster <clusternode> [--ram]
#显示cluster中的所有node
cluster_status
#设置集群名字
set_cluster_name <clustername>
#修改集群名字
rename_cluster_node <oldname> <newname>
#改变一个cluster中node的模式,该节点在转换前必须先停止,不能把一个集群中唯一的disk node转化为ram node
change_cluster_node_type <disc | ram>
#远程删除一个节点,删除前必须该节点必须先停止
rabbitmqctl forget_cluster_node rabbit@rabbit1
#同步镜像队列
sync_queue <queuename>
#取消同步队列
cancel_sync_queue <queuename>
#清空队列中所有消息
purge_queue [-p vhost] <queuename>
这里列举的很多命令是现阶段用不到的,如集群控制相关的命令,这些命令的用法会在以后的章节中逐渐理解。
3 C#驱动RabbitMQ
1 一个简单的栗子
作为开发者,我们最在意的还是怎么在代码中使用RabbitMQ,可以通过官方RabbitMQ开发文档来学习RabbitMQ的使用,这里以.NET为例演示一下RabbitMQ的最基本用法。创建两个Console应用,一个作为发送消息的生产者(Producer),一个作为接受消息的消费者(Consumer),生产者向队列写入消息,消费者接受这条消息,结构如下:
两个控制台应用都要添加RabbitMQ.Client包,命令如下:
Install-Package RabbitMQ.Client
生产者(Producer)代码:
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在设备ip,这里就是本机
HostName = "127.0.0.1",
UserName = "wyy",//用户名
Password = ""//密码
};
//第一步:创建连接connection
using (var connection = factory.CreateConnection())
{
//第二步:创建通道channel
using (var channel = connection.CreateModel())
{
//第三步:声明交换机exchang
channel.ExchangeDeclare(exchange: "myexchange",
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
//第四步:声明队列queue
channel.QueueDeclare(queue: "myqueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine("生产者准备就绪....");
//第五步:绑定队列到交互机
channel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey");
string message = "";
//第六步:发送消息
//在控制台输入消息,按enter键发送消息
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
//基本发布
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: null,
body: body);
Console.WriteLine($"消息【{message}】已发送到队列");
}
}
}
Console.ReadKey();
}
}
消费者(Consumer)代码:
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在设备ip,这里就是本机
HostName = "127.0.0.1",
UserName = "wyy",//用户名
Password = ""//密码
};
//第一步:创建连接connection
using (var connection = factory.CreateConnection())
{
//第二步:创建通道channel
using (var channel = connection.CreateModel())
{
//第三步:声明队列queue
channel.QueueDeclare(queue: "myqueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//第四步:定义消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"接受到消息【{message}】");
};
Console.WriteLine("消费者准备就绪....");
//第五步:处理消息
channel.BasicConsume(queue: "myqueue",
autoAck: true,
consumer: consumer);
Console.ReadLine();
}
}
}
}
依次运行Producer和Consumer两个应用程序,运行结果如下:
注意:上边的代码在生产者和消费者的代码中都声明了exchange和queue,这主要是为了让这两个程序可以按任意顺序启动,如:我们只在生产者代码中定义了exchange和queue,却先启动消费者,这会让造成消费者找不到自己需要的exhange和queue(出现404错误)。实际开发中创建exchange/queue、绑定队列以及设置routingKey这些工作,都可以通过WebUI管理界面或者使用Rabbitmq Control工具完成。
QueueDeclare方法用于声明队列,ExchangeDeclare用于声明交换机,我们在使用这两个方法声明时,可以设置队列和交换机的属性,如queue的名字,长度限制,exchange是否持久化、交换机类型等。
2 QueueDeclare方法详解
在上边的栗子中我们使用了声明队列的方法 QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments) ,该方法通过参数设置队列的特性。这里介绍一下该方法 中几个参数的作用,先看一个完整的声明队列的栗子:
//声明队列newsQueue
channel.QueueDeclare(queue: "myqueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>() {
//队列中消息的过期时间是8s
{ "x-message-ttl",* },
//队列60s没有被使用,则删除该队列
{"x-expires",* },
//队列最多保存100条消息
{"x-max-length", },
//队列中ready类型消息总共不能超过1000字节
{"x-max-length-bytes", },
//当队列消息满了时,丢弃传来后续消息
{"x-overflow","reject-publish" },
//丢弃的消息发送到deadExchange交换机
{"x-dead-letter-exchange","deadExchange" },
//丢弃的消息发送到deadExchange交换机时的RoutingKey
{"x-dead-letter-routing-key","deadKey" },
//队列中最大的优先级等级为10(在Publish消息时对每条消息设置优先级)
{"x-max-priority", },
//设置队列默认为lazy
{"x-queue-mode","lazy" }
});
QueueDeclare方法的参数如下:
queue:队列名字;
durable:是否持久化。设置为true时,队列信息保存在rabbitmq的内置数据库中,服务器重启时队列也会恢复(注意:重启后队列内部的消息不会恢复,怎么实现消息持久化以后会详细介绍);
exclusive:是否排外。设置为true时只有首次声明该队列的Connection可以访问,其他Connection不能访问该队列;且在Connection断开时,队列会被删除(即使durable设置为true也会被删除);
autoDelete:是否自动删除。设置为true时,表示在最后一条使用该队列的连接(Connection)断开时,将自动删除这个队列;
arguments:设置队列的一些其它属性,为Dictionary<string,object>类型,下表总结了arguments中可以设置的常用属性。
参数名 | 作用 | 示例 |
Message TTL | 设置队列中消息的有效时间 | { "x-message-ttl",1000*8 },设置队列中的所有消息的有效期为8s; |
Auto expire | 自动删除队列。一定的时间内队列没有被使用,则自动删除队列 | {"x-expires",1000*60 },设置队列的过期时长为60s,如果60s没有队列被访问,则删除队列; |
Max length | 队列能保存消息的最大条数 | {"x-max-length",100 },设置队列最多保存100条消息; |
Max length bytes | 队列中ready类型消息的总字节数 | {"x-max-length-bytes",1000 }, 设置队列中ready类型消息总共不能超过1000字节; |
Overflow behaviour | 当队列消息满了时,再接收消息时的处理方法。有两种处理方案:默认为"drop-head"模式,表示从队列头部丢弃消息;"reject-publish "表示不接收后续的消息 |
{"x-overflow","reject-publish" },设置当队列消息满了时,丢弃传来后续消息; |
Dead letter exchange | 用于存储被丢弃的消息的交换机名。Overflow behaviour 的两种处理方案中丢弃的消息都会发送到这个交换机 | {"x-dead-letter-exchange","beiyongExchange" },设置丢弃的消息发送到名字位beiyongExchange的交换机; |
Dead letter routing key | 被丢弃的消息发送到Dead letter exchange时的使用的routing Key | {"x-dead-letter-routing-key","deadKey" },设置丢弃的消息发送到beiyongExchange交换机时的RoutingKey值是"deadKey"; |
Maximum priority | 设置队列中消息优先级的最大等级,在publish消息时可以设置单条消息的优先级等级 | {"x-max-priority",10 },设置中消息优先级的最大等级为10; |
Lazy mode | 设置队列的模式,如果设置为Lazy表示队列中消息尽可能存放在磁盘中,以减少内存占用;不设置时消息都存放在队列中,用以尽可能快的处理消息 | {"x-queue-mode","lazy"},3.6以后版本可用,设置队列中消息尽可能存放在磁盘中,以减少内存占用。在消息拥堵时和消息持久化配置使用可以减少内存占用。 |
3 ExchangeDeclare方法详解
声明交换机的方法 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments) 可以设置交换机的特性,这里简单介绍一下这个方法的几个参数:
channel.ExchangeDeclare(exchange: "myexchange",
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: new Dictionary<string, object> {
{"alternate-exchange","BeiyongExchange" }//如果消息不能路由到该交换机,就把消息路由到备用交换机BeiyongExchange上
});
exchange:交换机名字。
type:交换机类型。exchange有direct、fanout、topic、header四种类型,在下一篇会详细介绍;
durable:是否持久化。设置为true时,交换机信息保存在rabbitmq的内置数据库中,服务器重启时交换机信息也会恢复;
autoDelete:是否自动删除。设置为true时,表示在最后一条使用该交换机的连接(Connection)断开时,自动删除这个exchange;
arguments:其他的一些参数,类型为Dictionary<string,object> 。
小结
本节主要介绍了RabbitMQ的基本概念,在Windows和Centos上的安装方法,及RabbitMQ Control工具的基本使用,最后演示了一个C#驱动RabbitMQ的栗子,并详细介绍了声明queue和exchange的方法。通过这一节我们大概了解了RabbitMQ的基本使用。以后的章节会逐渐介绍RabbitMQ的四种exchange、两种Consumer的特点和使用场景,以及消息确认、优先级、持久化等,最后搭建一个高可用的RabbitMQ集群,如果文中有错误的话,希望大家可以指出,我会及时修改,谢谢!
参考文章