一、MQ
在介绍RabbitMq之前,先来说一下MQ。什么是MQ?MQ全称为Message Queue即消息队列,就是一个消息的容器, MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka。根据自己项目的业务场景和需求来选择相应的MQ框架(MQ框架比较)。为什么要使用MQ呢?在项目中,一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。比如在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。
二、RabbitMQ
在MQ众多框架中RabbitMQ仍然是首选,RabbitMQ用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现的可复用的企业消息系统。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。所以我先学习一下AMQP协议。
三、AMQP协议
AMQP协议参考和转载自https://blog.csdn.net/yinwenjie/article/details/50820369写的非常好。
1、AMQP协议的各个组成部分
(1)AMQP协议的各个组成部分:AMQP协议中的元素包括:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等
(2)由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义。
(3)一个Broker可以创建多个Virtual Host(虚拟主机),Virtual Host的工作元素有Exchange和Queue。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成。
(4)Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息。
(5)Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中。
2、AMQP消息在这个结构中是如何通过Producer发出,又经过Broker最后到达Consumer的呢
(1)在Producer(消息生产者)客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了。
(2)Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系,并设置好了到这些Queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有三种模式。在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)。
(3)Queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽相同);如果已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel。
(4)Consumer收到消息后,就可以进行消息的处理了。但是整个消息传递的过程还没有完成:视设置情况,Consumer在完成某一条消息的处理后,将需要手动的发送一条ACK消息给对应的Queue(当然您可以设置为自动发送,或者无需发送)。Queue在收到这条ACK信息后,才会认为这条消息处理成功,并将这条消息从Queue中移除;如果在对应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会重新被发送给另外的Channel。当然,您还可以发送NACK信息,这样这条消息将会立即归队,并发送给另外的Channel。
3、Excahnge中Routing的三种模式
Exchange交换机在AMQP协议中主要负责按照一定的规则,将收到的消息转发到已经和它事先绑定好的Queue或者另外的Exchange中。Excahnge交换机的这个处理过程称之为Routing(路由)。目前流行的AMQP路由实现一共有三种:分别是Direct Exchange、Fanout Exchange和Topic Exchange。需要特别注意的是:Exhange需要具备怎样的‘路由’规则,并没有在AMQP标准协议进行强行规定,目前流行的AMQP转发规则都是AMQP实现产品自行开发的(这也是为什么AMQP消息中和路由、过滤规则相关的属性是存放在application-properties区域的原因)。
(1)Direct路由
direct模式从字面上的理解应该是‘引导’、‘直接’的含义。direct模式下Exchange将使用AMQP消息中所携带的Routing-Key和Queue中的Routing Key进行比较。如果两者完全匹配,就会将这条消息发送到这个Queue中。如下图所示:
(2)Fanout路由
Fanout路由模式不需要Routing Key。当设置为Fanout模式的Exchange收到AMQP消息后,将会将这个AMQP消息复制多份,分别发送到和自己绑定的各个Queue中。如下图所示:
(3)Topic路由
Topic模式是Routing Key的匹配模式。Exchange将支持使用‘#’和‘ * ’通配符进行Routing Key的匹配查找(‘#’表示0个或若干个关键词,‘ * ’表示一个关键词,注意是关键词不是字母)。如下图所示:
为了方便各位读者的理解,这里我们再举几个通配符匹配的示例:
(1)“param.#”,可以匹配“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配诸如“param1.test”、“param2.test”以为param这个关键词和param1这个关键词不相同。
(2)“param.*.* ”,可以匹配“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配诸如“param”、“param.test”、“parm.child”等等Routing Key。
(3)“param.*.value”,可以匹配“param.value.value”、“param.test.value”等Routing Key;但是不能匹配诸如“param.value”、“param.value.child”等Routing Key。
注意,以上介绍的Direct 路由模式和Topic 路由模式中,如果Exchange交换机没有找到任何匹配Routing Key的Queue,那么这条AMQP消息会被丢弃。(只有Queue有保存消息的功能,但是Exchange并不负责保存消息)