JMS和ActiveMQ介绍(3)_ActiveMQ
JMS和ActiveMQ介绍(3)_ActiveMQ

首先简单介绍一下ActiveMQ。ActiveMQ是由Apache软件基金会提供的开源免费消息服务器,目前版本是5.8.0。

ActiveMQ具有以下特点:

基于JMS 1.1和J2EE 1.4规范;

支持多种连接协议:HTTP/S,IP组播,SSL,STOMP,TCP,UDP,XMPP等;

支持多种消息持久化机制:文件形式持久化(KahaDB),关系数据库形式持久化(JDBC);

插件化的安全机制:ActiveMQ支持插件开发,并且它的安全机制就是以插件形式实现灵活配置的;

支持嵌入Java应用,ActiveMQ就可以作为一个单独服务,也可以直接嵌入到其他Java应用中;

支持与应用服务器进行集成:支持Apache Tomcat,Jetty,ApacheGeronimo,Jboss;

多种语言的客户端API:支持C/C++,.NET,Perl,PHP,Python,Ruby等;

集群;

动态化的简单管理。

JMS和ActiveMQ介绍(3)_ActiveMQ

ActiveMQ使用Java开发,使用Spring配置bean,使用Maven构建。

ActiveMQ 5.6.0的源代码目录结构如图所示,从pom文件中可以看出,共包含23个模块,比如核心模块activemq-core,web控制台模块activemq-web-console,文件形式的消息持久化模块kahadb等。

使用Maven编译、打包后,在assembly/target下ActiveMQ的生成可执行包。

ActiveMQ的执行包目录结构如图所示:

bin目录下是启动脚本;

conf目录下是配置文件;

data目录下是消息持久化的文件;

webapps是web控制台目录。

JMS和ActiveMQ介绍(3)_ActiveMQ

通过bin目录下的脚本可以启动ActiveMQ服务,从配置文件可以看出,ActiveMQ服务也是通过Spring来进行配置的,整个配置文件实质就是对一个broker实例进行相关配置,从命名空间可以看出,ActiveMQ broker实例其实就是一个org.apache.activemq.xbean. XBeanBrokerService类的实例,而XBeanBrokerService继承自org.apache.activemq.broker.BrokerService,BrokerService是ActiveMQ中的一个核心类,用于对外提供消息服务。

ActiveMQbroker的配置包含以下部分:

目的地策略:用来对目的地(队列、主题)的相关策略进行配置,包括流量控制、分发策略等;

JMX管理:用来配置JMX;

持久化设置:

存储设置:

连接器设置:

Web控制台设置:ActiveMQ通过内嵌Jetty,用来提供Web控制台,能够对连接、主题、队列、消费者等进行可视化的管理。

JMS和ActiveMQ介绍(3)_ActiveMQ

在ActiveMQ中,队列或主题的地址名称可以使用“.”号分割的方式来表示,例如queue.news.subject0-10。

并且ActiveMQ支持使用通配符表示某一类队列和主题的地址,通配符包括:

.,用于分割地址中的名称;

*,用于匹配“.”之间的任意字符;

>,用于匹配任意字符。

JMS和ActiveMQ介绍(3)_ActiveMQ

在目的地策略中,包含多个策略实体,策略实体既可以描述某一个队列或主题的策略,也可以通过通配符描述某一类队列或主题的策略。

目的地策略配置示例如上所示,该策略实体对所有主题进行配置,对每个主题进行流量控制,内存限制为1M,将消息引用保存在内存中。

以下列出了策略实体的部分属性,全部属性可以从ActiveMQ官网查看到。

JMS和ActiveMQ介绍(3)_ActiveMQ

策略实体中还包含一些子标签用于设置一些具体的策略,以下列出部分子标签:

消息分发策略;

死信队列策略:当消息重复发送多次仍未成功时,ActiveMQ将向死信队列发送一个消息,死信队列策略对该操作进行配置,比如设置使用共享的死信队列或单独的死信队列;

消息撤销策略:当消息消费较慢需要删除消息时使用,比如撤销最久的消息;

消息数量限制策略:用于设置持久化消息数量最大值;

订阅恢复策略:当订阅者重新连接时使用,比如只恢复最后一个消息;

持久订阅这的消息引用策略,队列的消息引用策略,订阅者的消息引用策略:用于设置消息应用存储位置,比如将消息引用存储于内存中;

慢消费者策略:当消息消费较慢时使用,比如直接退出。

JMS和ActiveMQ介绍(3)_ActiveMQ

下面具体再介绍一下分发策略。

首先介绍一下预读取机制,为了提高消息消费速度,在一次消息接收过程中,ActiveMQ通过预读取机制将消息尽可能多地推送给消费者,在消费者客户端缓存。

但为了防止消费者缓存溢出,ActiveMQ通过prefetchlimit控制当前推送给消费者且未收到确认的消息数量。

prefetch limit的默认值是:持久化消息队列为1000,非持久化消息队列为1000,持久化消息主题为100,非持久化消息主题为-1。

若prefetch limit=1,则消费者每次只会接收一个消息,相当于关闭预读取机制。

若prefetchlimit=0,则消息只有在消费者主动拉取时才会被接收,而不会被推送给消费者。

对于分发策略,有如下具体的策略,其中roundRobin策略是当有多个消费者时,将消息平均地发给各个消费者,而不是采用预读取机制先将消息全部发往某个消费者直至达到prefetch limit。而strictOrder策略相反,是将消息先全部发往某个消费者,但该策略可以保证当主题中有多个消息生产者,且有多个消息消费者时,每个消息消费者接收到的消息顺序是一致的。

JMS和ActiveMQ介绍(3)_ActiveMQ

流量控制是为了防止消息生产较快,而消费较慢,导致队列或主题堵塞。

在ActiveMQ5.0之前,ActiveMQ使用TCP协议本身的流量控制机制,这种方法的不足是只能对整个连接进行流量控制,而不能对单个生产者进行流量控制,而且当多个消息生产者和消费者使用同一个连接时可能会造成死锁。

从ActiveMQ5.0开始,ActiveMQ支持对单个生产者进行流量控制。流量控制可在生产者客户端和服务器端进行配置。

如果生产者客户端异步发送消息(useAsyncSend置为true),发送消息时线程不会阻塞等待消息服务器返回确认,此时就需要在生产者客户端配置流量控制,通过setProducerWindowSize设置一个最大值,即生产者发送的未接收到确认的消息不能超过该最大值,若超过,则等待。

服务器端的流量控制配置在两个地方:

1)首先在目的地策略中,通过producerFlowControl可以对每个目的地设置是否进行流量控制,memoryLimit表示消息存储在内存中的最大量,vmCursor表示在内存中仅保存消息的游标,这样可以在内存中存储尽可能多的消息。

2)在存储设置中,可以对整个消息服务器的存储用度进行配置,memoryUsage表示ActiveMQ使用的内存,storeUsage表示持久化消息存储文件的大小,tempUsage表示非持久化消息存储的文件大小。在存储配置中还可以设置当存储用度不足时系统如何处理,除默认等待外还支持sendFailIfNoSpace,sendFailIfNoSpaceAfterTimeout。

JMS和ActiveMQ介绍(3)_ActiveMQ

ActiveMQ支持JMX,在ActiveMQ中配置JMX的示例如下所示。在启动后,可以通过Java工具jconsole连接ActiveMQ,对其进行监控。

JMS和ActiveMQ介绍(3)_ActiveMQ

对于消息持久化,消息以先进先出的方式存储于队列中。只有接收到消费者的确认,消息才会从队列中出队。

对于具有持久订阅者的主题,主题中只保存一份消息。每个持久订阅者保存一个指向最后一个接收消息的指针。只有接收到所有持久订阅者对于消息的确认,消息才会从主题中删除。

JMS和ActiveMQ介绍(3)_ActiveMQ

下面将介绍4种消息存储机制。

KahaDB是ActiveMQ推荐的消息存储机制,它基于文件,是最快的一种消息存储机制。

KahaDB的实现机制如图所示,首先所有消息数据追加写入log文件,log文件的大小有限制,若达到限制,则创建一个新文件,若log文件中的消息已全部发送出去,则该文件被删除。队列、主题使用B树数据结构存储,这样能够快速查询到其中的消息,B树中的消息实际存储对log文件中数据的引用。同时队列、主题的消息还保存在缓存中,以提高访问速度。

与KahaDB类似,基于文件,但与KahaDB不同的是,每个队列有独立的索引文件,多用于消息量大的场景,但不适用于队列多的场景。

JMS和ActiveMQ介绍(3)_ActiveMQ
JMS和ActiveMQ介绍(3)_ActiveMQ

在客户端连接服务器或服务器之间互连时,ActiveMQ支持多种连接协议,以下是这些协议以及使用说明,其中,TCP是ActiveMQ默认使用的网络协议,STOMP是一种面向简单文本的消息协议,主要用于多语言支持,实践中,我们在PHP和Python的客户端连接服务器时,使用了该协议,VM主要用于访问在同一个JVM中运行的服务器。

在客户端使用vm连接消息服务器时,如vm://brokerName,若同一个JVM内存在以该brokerName命名的消息服务器实例,则连接至该实例,若不存在,则创建一个以该brokerName命名的消息服务器实例,并连接。

在连接时采用的URI格式如下所示,第一种是单一URI,表示一个消息服务器连接地址,第二种是组合URI,即将多个消息服务器地址组合起来。

在消息服务器配置中,连接有两种配置:

transportconnector,用于配置客户端与服务器之间的连接,向客户端提供连接端口,ActiveMQ通常占有61616端口对外提供tcp连接;

networkconnector,用于配置服务器与服务器之间的连接,实现服务器网络,下面将具体介绍ActiveMQ的两种服务器网络。

JMS和ActiveMQ介绍(3)_ActiveMQ

首先是静态网络。

在网络中服务器配置已知的情况下,可以使用static创建静态网络,例如已知网络中已有一消息服务器BrokerB,则在BrokerA的配置中可以添加以上配置,在启动BrokerA时,BrokerA会创建与BrokerB的连接,当生产者将消息发送至BrokerA,且消费者从BrokerB接收该消息时,BrokerA会自动将消息转发至BrokerB。

在这种配置下,BrokerA至BrokerB的连接是单向的,即消息只能从BrokerA转发至BrokerB。如果需要将连接配置成双向的,可以将duplex属性置成true,这样,BrokerA和BrokerB即可以向对方转发消息,也可以从对方接收消息。

JMS和ActiveMQ介绍(3)_ActiveMQ

静态网络需要已知服务配置情况,且不易于进行后期扩展,通过使用动态网络可解决以上问题。在动态网络中,每个消息服务器需要进行以上配置。在服务启动后,会自动使用IP组播在网络中寻找其他消息服务器实例,并创建连接。

JMS和ActiveMQ介绍(3)_ActiveMQ

在客户端连接消息服务器时,既可以使用单一URI连接单个服务器,也可以使用组合URI从多个服务器中选择一个进行连接。

在组合URL中,failover是一种比较常用的客户端连接方式,使用failover时,客户端会从多个服务器地址中随机选择一个进行连接,当连接失效时,会尝试连接其他的服务器。

如果只连接一个服务器,也建议在服务器地址前再加上failover,这样可以建立重连机制,提高系统健壮性。

discovery与failover类似,但是通过组播从动态网络中查询可用的服务器,并从中随机选择一个进行连接,当连接失效时,也会尝试连接其他的服务器。

peer与vm类似,在使用peer连接时,会自动在JVM内创建服务器,另外,还会在建立此服务器与网络中同组服务器的连接。

peer的应用场景是客户端与服务器经常会有连接失效发生,但又需要在连接失效时,客户端仍可以正常工作。使用peer,客户端可以在本地JVM内创建服务器并与其通信,当与远程服务器连接正常时,本地服务器会再与远程服务器进行通信。

fanout可以向静态网络或动态网络发送消息,网络中的每个服务器都会接收到消息。