一、 什么是Broker云
Apathe Qpid 支持Broker Federation ,也就是Broker联盟或者叫做Broker云。Broker Federation可以通过配置消息路由,创建一个可以让特定的消息可以从源broker自动路由到目标broker的消息系统网络。Broker Federation通常在一个包含大量broker的大网络环境中使用。如果网络互联允许,完全可以在单独的一个网络位置配置整个Broker云消息系统网络。当Broker服务主机发生变化时,Broker云内部的路由规则可以根据时间、拓扑等等条件动态地变化和适应。
Broker Federation可以适用于各种各样的场景,只要能够依据一定的规则来组织不同的broker逻辑区域。Broker Federation的应用场景举例:
地理位置:路由客户的请求到最近的工作服务站点。
服务类型:高端客户的请求需要路由到响应更迅速的服务站点。
负载均衡:Brokers之间的路由可以动态适应当前或预期的负载情况。
WAN连通性:不同地域的Brokers之间路由通过广域网连接,客户端只需要连接到本地Broker,分散各地域的brokers同样可以提供消息分发和存储。
功能性组织:根据不同软件系统之间的消息流可反映出一个分布式系统的逻辑结构。
可堆叠Exchange:高性能交换器比如xml exchange,可以通过堆叠来提高性能。
跨科跨部门工作流:根据不同Brokers之间的消息流可反映出跨科跨部门组织之间的工作流。
下图是一个根据地理位置划分区域后部署的Broker云,各个城市之间的主机完全可以*交换消息。
二、 消息路由
一个Broker Federation需要通过创建消息路由才能实现。每条消息路由是单向的,如果需要双向,可以通过在两个方向各创建一条路由来实现。路由的目的地址总是目标Broker的exchange,源地址可以是源Broker的exchange,或queue。一般情况下,在目标Broker内部创建路由,向源Broker订阅相应的地址,这种路由是拉式路由。相反,在源Broke创建路由,向目标Broker推送消息到指定的地址,这是推式路由。多个源Broker指定同一个目标Broker时,推方式路由效率更高。路由也支持持久化。
有三种路由,Queue路由,Exchange路由和动态Exchange路由,其中前面两种属于静态路由。
Queue路由:Broker A的某个queue地址中的所有消息路由到Broker B的某个exchange。
Exchange路由:Broker A 某个Exchange中匹配指定Binding Key(Routing key,可理解为消息主题)的所有消息路由到Broker B 的同名Exchange中去。当然,对于fanout型的Exchange,binding key是忽略的。
动态Exchange路由:客户端订阅Broker A指定Binding key的某个Exchange地址,若Broker A通过动态Exchange路由连接到Broker B,则客户端不仅可以接收到Broker A的该Exchange地址上匹配Binding Key的消息,同时也可以接收Broker B 的相同名字的Exchange地址上匹配Binding Key的消息。这里的Binding是可以动态变更的。动态路由接收两个Broker之间同名Exchange之间的所有Binding key。动态路由其实是一种特殊的Exchange路由,区别是所有Binding Key都可以动态自动指定,无需手动显式指定,但它只能是拉式路由。
三、 拓扑结构
使用双向路由,可以实现星形、树形或线形的拓扑。如果在环形的拓扑使用,只能使用单向路由。
星形拓扑:
树形拓扑:
线形拓扑:
环形拓扑:
消息传递过程总是需要一定的时间的,因此在设计拓扑结构时,一条消息的完整路径所经过的broker节点尽可能要少。大多数情况下,树形或星形拓扑可以满足大多数需求,同时能够很好的防止消息环路的发生。
在一个拓扑中的任何两个Broker节点A、B,应该有且只有一条路由路径。如果有多于一条路径,会导致消息环路,出现消息重复甚至消息风暴。
四、 Broker Federation跟高可用集群结合
Broker云同样可以应用在部署了HA集群的网络环境,这时,每一个HA集群看作是单个Broker节点。建立消息路由时,可以连接一个HA集群里的任意一个Broker到另一个HA集群里的任意一个Broker。在同一个集群内部,每个Broker都可以共享同一个集群内部所建立的其他消息路由。
五、 qpid-route命令行工具的使用
操作系统:CentOS 6.5 i386
Apache Qpid版本:qpid-cpp 0.32
qpid-route需要安装qpid-qmf-0.32.tar.gz。
主机名 |
IP地址 |
node1.example.com |
192.168.15.16 |
node2.example.com |
192.168.15.69 |
Queue路由
首先,在node2创建一个名为public 的queue;
[root@node1 ~]# qpid-config -b node2.example.com add queue public
[root@node1 ~]# qpid-route queue add node1.example.com node2.example.com amq.fanout public
查看路由拓扑
[root@node1 ~]# qpid-route route map node1.example.com
Finding Linked Brokers:
node1.example.com:5672... Ok
node2.example.com:5672... Ok
Dynamic Routes:
none found
Static Routes:
node1.example.com:5672(ex=amq.fanout) <= node2.example.com:5672(queue=public)
[root@node1 ~]#
查看连接状态
[root@node1 ~]# qpid-route link list node1.example.com
Host Port Transport Durable State Last Error
===================================================================
node2.example.com5672 tcp N Operational
[root@node1 ~]#
订阅node1的amq.fanout地址
[root@node1 messaging]# ./drain -b node1.example.com -t 60 amq.fanout
然后向node2的public地址发送消息
[root@node1 messaging]# ./spout -b node2.example.com --content 123456 public
Message(properties={spout-id:13812f41-d342-4fb9-9254-863fcd1e8fe5:0}, content='123456')
意料之内,node2上的public的消息路由到node2的amq.fanout地址
[root@node1 messaging]# ./drain -b node1.example.com -t 60 amq.fanout
Message(properties={spout-id:13812f41-d342-4fb9-9254-863fcd1e8fe5:0, x-amqp-0-10.routing-key:public}, content='123456')
清除queue 路由
[root@node1 messaging]# qpid-route queue del node1.example.com node2.example.com amq.fanout public
Exchange路由
[root@node1 messaging]# qpid-route route add node1.example.com node2.example.com amq.topic global.#
[root@node1 messaging]# qpid-route route map node1.example.com
Finding Linked Brokers:
node1.example.com:5672... Ok
node2.example.com:5672... Ok
Dynamic Routes:
none found
Static Routes:
node1.example.com:5672(ex=amq.topic) <= node2.example.com:5672(ex=amq.topic) key=global.#
[root@node1 messaging]#
向node2的amq.topic地址发送主题带global的消息,观察该消息是否正确路由到node1的amq.topic地址
首先订阅node1地址amq.topic
[root@node1 messaging]# ./drain -b node1.example.com -t 60 amq.topic
然后,向node2的地址amq.topic发送带global主题的消息
[root@node1 messaging]# ./spout -b node2.example.com --content 123456 amq.topic/global.aaa.bbb
Message(properties={spout-id:2fb5de3d-b3a0-449b-8810-c8b9cb617011:0}, content='123456')
可以观察到,消息正确被路由了
[root@node1 messaging]# ./drain -b node1.example.com -t 60 amq.topic
Message(properties={qpid.subject:global.aaa.bbb, spout-id:2fb5de3d-b3a0-449b-8810-c8b9cb617011:0, x-amqp-0-10.routing-key:global.aaa.bbb, x-qpid.trace:cad71590-80e1-4dcc-81eb-2ee49df2a1b3}, subject='global.aaa.bbb', content='123456')
清除路由
[root@node1 messaging]# qpid-route route del node1.example.com node2.example.com amq.topic global.#
动态Exchange路由
首先,分别在node1,node2创建一个topic 的exchange fed.topic
[root@node1 messaging]# qpid-config -b node1.example.com add exchange topic fed.topic
[root@node1 messaging]# qpid-config -b node2.example.com add exchange topic fed.topic
创建两条动态消息路由,实现双向传递
[root@node1 messaging]# qpid-route dynamic add node1.example.com node2.example.com fed.topic
[root@node1 messaging]# qpid-route dynamic add node2.example.com node1.example.com fed.topic
查看路由
[root@node1 messaging]# qpid-route route map node1.example.com
Finding Linked Brokers:
node1.example.com:5672... Ok
node2.example.com:5672... Ok
Dynamic Routes:
Exchange fed.topic:
node2.example.com:5672 <=> node1.example.com:5672
Static Routes:
none found
[root@node1 messaging]#
在node1订阅地址fed.topic
[root@node1 messaging]# ./drain -b node1.example.com -t 60 fed.topic
向node2地址fed.topic发送任意主题消息
[root@node1 messaging]# ./spout -b node2.example.com --content 123456 fed.topic/global
Message(properties={spout-id:bec400b9-21ba-4b44-9a64-8c78a0003e7f:0}, content='123456')
[root@node1 messaging]# ./spout -b node2.example.com --content 123456 fed.topic/xxxxx
Message(properties={spout-id:bfa28743-4208-4d4d-96a3-e21c9af482a7:0}, content='123456')
可以观察到,node1上接收了所有node2上fed.topic所有消息
[root@node1 messaging]# ./drain -b node1.example.com -t 60 fed.topic
Message(properties={qpid.subject:global, spout-id:bec400b9-21ba-4b44-9a64-8c78a0003e7f:0, x-amqp-0-10.routing-key:global, x-qpid.trace:cad71590-80e1-4dcc-81eb-2ee49df2a1b3}, subject='global', content='123456')
Message(properties={qpid.subject:xxxxx, spout-id:bfa28743-4208-4d4d-96a3-e21c9af482a7:0, x-amqp-0-10.routing-key:xxxxx, x-qpid.trace:cad71590-80e1-4dcc-81eb-2ee49df2a1b3}, subject='xxxxx', content='123456')
同样,反过来测试,结果也一样。
[root@node1 messaging]# ./spout -b node1.example.com --content 123456 fed.topic/global
Message(properties={spout-id:00aaf56e-6282-436a-81ed-638baaa48737:0}, content='123456')
[root@node1 messaging]# ./spout -b node1.example.com --content 123456 fed.topic/xxxxx
Message(properties={spout-id:ceca4150-e2a0-4d3b-b04a-d81deffc9709:0}, content='123456')
[root@node1 messaging]# ./drain -b node2.example.com -t 60 fed.topic
Message(properties={qpid.subject:global, spout-id:00aaf56e-6282-436a-81ed-638baaa48737:0, x-amqp-0-10.routing-key:global, x-qpid.trace:aae222c1-d458-4aeb-a8f0-045211bf1e8d}, subject='global', content='123456')
Message(properties={qpid.subject:xxxxx, spout-id:ceca4150-e2a0-4d3b-b04a-d81deffc9709:0, x-amqp-0-10.routing-key:xxxxx, x-qpid.trace:aae222c1-d458-4aeb-a8f0-045211bf1e8d}, subject='xxxxx', content='123456')
可以这样理解,node1和node2的fed.topic是可以看作是一体的,任意一个节点的fed.topic收到的消息,都会路由到另一个节点的fed.topic那里,并且binding key保持一致。
清除路由
[root@node1 messaging]# qpid-route dynamic del node2.example.com node1.example.com fed.topic
[root@node1 messaging]# qpid-route dynamic del node1.example.com node2.example.com fed.topi
PS:在需要应用流控的场景,exchange路由并不能很好的触发流控,而且会出现路由自动断开并自动重建的抖动现象,而queue路由则能很好工作,估计这是一个bug。