【总结】消息服务中间件(ActvieMQ)
1,使用消息中间件场景
- 解耦和扩展性
- 系统之间集成只需要基于消息队列的数据接口层,这允许系统各自独立的扩展或修改各自的实现,只需要确保遵守同样的接口约束
- 异步通信/处理
- 一些事务性很强,耗时久的请求,如下订单,注册发邮件等,接受请求后马上告诉用户后台已经接受到了请求并在处理请等待,前台不停的检查处理情况,防止了用户一直挂在当前页面,提升了交互体验
- 有些请求伴随着很多后台的附加处理,比如:记录日志,发通知邮件,相关计算处理更新通知等,都可以通过发消息的方式处理,不影响用户请求的主流程
- 处理突发高并发/高峰值的负载情况
- 为这种少见的突发高并发/高峰值的负载情况花费大量的费用提升硬件性能是不值得了
- 用消息队列作为一个缓冲承接用户请求,后台服务器按照自己处理的能力从消息队列拿任务进行处理,不至于让系统超出负荷而崩溃
- 消息中间件需要确保
- 可恢复性
- 送达保证
- 如何确保幂等性
- 发送仅发送一次
- 排序保证
- 安全保证
2,ActiveMQ
- 消息类型
- TextMessage(文本消息)
- MapMessage(映射消息)
- BytesMessage(字节消息)
- StreamMessage(流消息)
- ObjectMessage(对象消息)
- JMS可靠性机制
- 确认JMS消息
- 持久性
- 优先级
- 消息过期
- 临时目的地
- 持久订阅
- 本地事务
- JMS规范
- 点对点域:QueueConnectionFactory,QueueConnection,Queue,QueueSession,QueueSender,QueueReceiver
- 发布/订阅域:TopicConnectionFactory,TopicConnection,Topic ,Topic Session,TopicPublisher,TopicSubscriber
- Broker监控
- JMX
- Web Console:Web监控后台
- Advisory Message(通知消息):允许你通过标准的JMS 消息来监控系统
- Command Agent
- XMPP 是一种基于XML的即时通信协议
- 启用了Command Agent 的broker 上会有一个来自Command Agent的连接,ActiveMQ 提供了ActiveMQ messages和XMPP之间的双向桥接
- 如果客户加入了一个聊天室,那么这个聊天室的名字会被映射到一个JMS topic
- 尝试在聊天室内发送消息会导致一个JMS消息被发送到这个topic
- 呆在一个聊天室中意味着这将保持一个对相应JMS topic 的订阅。因此发送到这个topic 的JMS 消息也会被发送到聊天室
- Visualization Plugin
- 应用协议
- MQTT
- (Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议
- 该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议
- MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
- 对负载内容屏蔽的消息传输
- 使用TCP/IP提供网络连接
- 有三种消息发布服务质量
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
- “至少一次”,确保消息到达,但消息重复可能会发生
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制
- WS
- Openwire:ActiveMQ自身设计的协议
- Stomp
- Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM (Message Oriented Middleware,面向消息的中间件)设计的简单文本协议
- 它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)
- STOMP协议工作于TCP协议之上
- AMQP
- 即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
- 一套确定的消息交换功能,也就是“高级消息交换协议模型”。AMQP模型包括一套用于路由和存储消息的功能模块,以及一套在这些模块之间交换消息的规则
- 传输(Transport)协议
- VM
- VM transport 允许在VM 内部通信,从而避免了网络传输的开销
- 采用的连接不是socket 连接,而是直接地方法调用
- TCP
- 以包交换进行主机通信的可靠的传输协议
- SSL
- SSL协议在TCP协议的基础上传输加密的安全数据
- 它使用一组密钥(公钥和私钥)确保传输通道的安全性
- Failover
- Failover Transport 是一种重新连接的机制,它工作于其它transport 的上层,用于建立可靠的传输
- 它的配置语法允许制定任意多个复合的URI。Failover transport 会自动选择其中的一个URI 来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接
- Peer
- Discovery
- Discovery transport 是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表
- Zeroconf
- HTTP
- 防火墙仅允许使用一些基础的协议访问。这是HTTP协议出现的原因
- ActiveMQ实现了HTTP传输连接器,能够提供基于xml的消息传输。可以使用HTTP协议绕过防火墙
- UDP
-UDP是一种无连接的协议,不能保证数据报的完整传输
-UDP不能保证数据包的有序接收,也不能保证数据包的重复发送
- Multicast
- 持久化
- Kaha Message Store
- 一种基于文件的消息存储,并且联合Journal事务,可稳定存储消息并且恢复消息
- KahaDB消息存储使用事务Log作为它的索引,并且对所有的Destination仅仅使用一个索引文件,该索引文件是事务日志文件组中消息ID的索引
- KahaDB消息存储联合使用快速的事务处理:Journal以及数据日志文件,该日志文件是消息ID的索引,并且在内存中缓存消息
- AMQ Persistence
- 类似于KahaDB。它使用了journal事务来确保稳定的持久、恢复、高性能的索引
- 主要用于大数据(消息)量的存储
- 对每个索引文件,有两个单独的文件;一个用于每个Destination
- 如果每个Broker有上千个Queue,此时使用AMQ消息存储是不合适的,并且恢复数据也很慢,因为所有的索引文件需要rebuid,需要Broker扫描所有的Data logs去再次构建索引
- Data Logs: 作为消息journal
- Cache: 在消息写入data log后,在内存中保持消息用于快速恢复
- Reference Store:在journal保存消息的引用,按照消息ID索引
- JDBC Persistence
- 可以将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2
- LevelDB
- 这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性
- 与KahaDB不同的是,它不是使用传统的B-树来实现对日志数据的提前写,而是使用基于索引的LevelDB
- 安全机制
- Simple Authentication Plugin
- JAAS Authentication Plugin
- Custom Authentication Implementation
- Authorization Plugin
- 更多功能
- Exclusive Consumer
- ActiveMQ 从4.x 版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。Broker 会从多个consumers 中挑选一个consumer 来处理queue 中所有的消息,从而保证了消息的有序处理
- Message Groups
- Exclusive Consumer 功能的增强。逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer
- 跟所有的消息都由唯一的consumer 处理不同,JMS 消息属性JMSXGroupID 被用来区分message group
- JMS Selectors
- JMS Selectors 用于在订阅中,基于消息属性对消息进行过滤
- Pending Message Limit Strategy
- ActiveMQ 通过prefetch 机制来提高性能,这意味这客户端的内存里可能会缓存一定数量的消息
- 缓存消息的数量由prefetch limit 来控制
- 当某个consumer 的prefetch buffer 已经达到上限,那么broker 不会再向consumer分发消息,直到consumer 向broker 发送消息的确认
- Composite Destinations
- ActiveMQ 支持composite destinations。它允许用一个虚拟的destination 代表多个destinations
- 例如你可以通过composite destinations 在一个操作中同时向12 个queue 发送消息。在composite destinations 中,多个destination之间采用","分割
- Mirrored Queues
- Wildcards
- Wildcards 用来支持联合的名字分层体系(federated name hierarchies)
- Async Sends
- ActiveMQ 支持以同步(sync)方式或者异步(async)方式向broker 发送消息
- ActiveMQ 缺省使用异步传输方式
- 按照JMS 规范,当在事务外发送持久化消息的时候,ActiveMQ 会强制使用同步发送方式,保证了broker 已经成功地将消息持久化,而且不会丢失。但是这样作也严重地影响了性能
- Dispatch Policies
- ActiveMQ 的缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的
- Message Cursors
- Optimized Acknowledgement
- Producer Flow Control
- Message Transformtion
3,高可用(HA)和集群
- 高可用(HA,防单点故障):Master-Slave(主从)部署方式
- Shared filesystem Master-Slave主从
- 通过共享存储目录来实现master和slave的热备
- ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master
- 性能较好
- Shared database Master-Slave主从
- 通过共享数据库来实现master和slave的热备
- 性能比共享文件方式差
- Replicated LevelDB Store方式
- ActiveMQ5.9以后才新增的特性
- 使用ZooKeeper协调选择一个node作为master,被选择的master broker node开启并接受客户端连接
- 其他node转入slave模式,连接master并同步他们的存储状态。slave不接受客户端连接。所有的存储操作都将被复制到连接至Master的slaves
- 性能较好
- Broker集群(负载均衡)
- Static Broker-Cluster部署
- Dynamic Broker-Cluster部署