apach-rocketmq 笔记

时间:2024-12-13 06:59:49

RocketMq

rocketmq介绍

RocketMQ 是一款分布式消息中间件,由阿里巴巴开源并捐赠给 Apache 基金会,它借鉴了 Kafka 的设计并进行了很多改进和优化,具有高性能、高可靠、易扩展等特点。RocketMQ 支持多种消息模型,包括发布/订阅、点对点、事务消息等,适用于各种分布式系统中的消息传递需求。

优点 缺点 适合场景
apache kafka 吞吐量大,性能好,集群高可用 存在数据丢失,功能单一 日志分析,大数据采集
rabittmq 消息可靠性高,功能全面 吞吐量低,生态差 小规模服务调用
apache pulsar 消息可靠性高 生态不完善 大规模服务调用
apache rocketmq 高吞吐,高并发,高性能,协议丰富 服务加载慢 几乎全场景,特别适合金融业务(电商,支付等)
官方地址:
https://rocketmq.apache.org/

下载地址:
https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

rocketmq的特点

  1. 高性能:RocketMQ 采用零拷贝技术,减少了消息传递过程中的内存拷贝,提高了消息传递的效率。
  2. 高可靠性:RocketMQ 支持消息持久化,确保消息不会丢失。同时,RocketMQ 还支持消息重试和死信队列,提高了消息的可靠性。
  3. 高扩展性:RocketMQ 支持集群部署,可以水平扩展,提高系统的处理能力。
  4. 多种消息模型:RocketMQ 支持发布/订阅、点对点、事务消息等多种消息模型,满足各种分布式系统中的消息传递需求。
  5. 丰富的协议:RocketMQ 支持多种协议,包括 TCP、HTTP、MQTT 等,可以满足各种应用场景的需求。
  6. 丰富的功能:RocketMQ 支持消息过滤、消息轨迹、消息重试、死信队列等功能,提高了消息处理的灵活性和可靠性。

rocketmq的架构

RocketMQ 的架构主要包括以下几个部分:

  1. NameServer:NameServer 是 RocketMQ 的注册中心,负责管理 Broker 的元数据信息,包括 Broker 的地址、状态等。NameServer 提供了 Broker 的注册和发现功能,客户端可以通过 NameServer 获取 Broker 的地址信息。
  2. Broker:Broker 是 RocketMQ 的消息存储和转发节点,负责接收和存储消息,并将消息转发给消费者。Broker 支持消息持久化,确保消息不会丢失。同时,Broker 还支持消息重试和死信队列,提高了消息的可靠性。
  3. Producer:Producer 是消息的生产者,负责发送消息到 Broker。Producer 可以通过 NameServer 获取 Broker 的地址信息,并将消息发送到 Broker。
  4. Consumer:Consumer 是消息的消费者,负责从 Broker 拉取或订阅消息进行处理。Consumer 可以通过 NameServer 获取 Broker 的地址信息,并从 Broker 拉取或订阅消息。
  5. Topic:Topic 是消息的分类,用于将消息进行分组。Producer 和 Consumer 可以通过 Topic 来指定要发送或接收的消息。
  6. Message:Message 是 RocketMQ 的消息载体,包含了消息的元数据(如消息 ID、消息类型、消息内容等)和消息体。
  7. Offset:Offset 是消息的偏移量,用于标识消息在 Broker 中的位置。Consumer 可以通过 Offset 来指定要拉取或订阅的消息位置。
  8. Queue:Queue 是消息的队列,用于将消息进行分片存储。Broker 可以将消息存储在多个 Queue 中,以提高消息的并发处理能力。
  9. Transaction:Transaction 是 RocketMQ 的分布式事务支持,用于保证消息的可靠性和一致性。Producer 可以通过 Transaction 发送事务消息,并在事务提交或回滚时更新消息的状态。
  10. Filter:Filter 是 RocketMQ 的消息过滤功能,用于根据消息的属性或内容进行消息过滤。Producer 和 Consumer 可以通过 Filter 来指定要发送或接收的消息。
  11. Trace:Trace 是 RocketMQ 的消息追踪功能,用于记录消息的发送、接收和处理过程。Producer 和 Consumer 可以通过 Trace 来查看消息的轨迹,以便进行问题排查和性能优化。
  12. Cluster:Cluster 是 RocketMQ 的集群模式,用于将多个 Broker 组成一个集群,以提高系统的处理能力和可靠性。集群中的 Broker 可以通过选举机制选择 Master 和 Slave,实现数据的备份和恢复。
  13. HA:HA 是 RocketMQ 的高可用性支持,用于保证 Broker 的可靠性和可用性。HA 通过主从复制和故障转移机制,确保 Broker 在发生故障时能够自动切换,保证系统的持续运行。
  14. Security:Security 是 RocketMQ 的安全支持,用于保护消息的传输和存储安全。RocketMQ 支持多种安全机制,包括 SSL/TLS 加密、身份验证等,确保消息的机密性和完整性。
  15. Monitoring:Monitoring 是 RocketMQ 的监控功能,用于实时监控系统的运行状态和性能指标。RocketMQ 提供了丰富的监控指标,包括消息吞吐量、延迟、队列长度等,帮助用户了解系统的运行状态并进行性能优化。
  16. Tools:Tools 是 RocketMQ 的管理工具,用于管理和监控 RocketMQ 集群。RocketMQ 提供了丰富的管理工具,包括命令行工具、Web 管理界面等,帮助用户进行集群管理和性能优化。
  17. Integration:Integration 是 RocketMQ 的集成功能,用于与其他系统集成。RocketMQ 支持与多种系统集成,包括大数据平台、流处理平台、微服务等,以满足各种应用场景的需求。
  18. Extensibility:Extensibility 是 RocketMQ 的可扩展性支持,用于支持自定义扩展。RocketMQ 提供了丰富的扩展接口,包括消息存储、消息转发、消息过滤等,帮助用户进行自定义扩展和优化。
  19. Scalability:Scalability 是 RocketMQ 的可扩展性支持,用于支持水平扩展。RocketMQ 支持集群部署,可以水平扩展,提高系统的处理能力。
  20. High Availability:High Availability 是 RocketMQ 的高可用性支持,用于保证系统的可靠性和可用性。RocketMQ 支持集群部署,可以水平扩展,提高系统的处理能力。
  21. High Performance:High Performance 是 RocketMQ 的高性能支持,用于提高系统的处理能力。RocketMQ 采用零拷贝技术,减少了消息传递过程中的内存拷贝,提高了消息传递的效率。
  22. Rich Protocols:Rich Protocols 是 RocketMQ 的协议支持,用于支持多种协议。RocketMQ 支持多种协议,包括 TCP、HTTP、MQTT 等,可以满足各种应用场景的需求。
  23. Rich Features:Rich Features 是 RocketMQ 的功能支持,用于支持丰富的消息处理功能。RocketMQ 支持消息过滤、消息轨迹、消息重试、死信队列等功能,提高了消息处理的灵活性和可靠性。

rocketmq的安装

下载RocketMQ安装包

https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

解压安装包

unzip rocketmq-all-4.9.4-bin-release.zip

启动NameServer

nohup sh bin/mqnamesrv &

启动Broker

nohup sh bin/mqbroker -n localhost:9876 &

验证启动是否成功

 ps -ef|grep java  
  501  5434  5394   0 10:46PM ttys001    0:02.85 /Library/Java/JavaVirtualMachines/jdk-23.jdk/Contents/Home/bin/java -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:file=/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -cp .:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../conf:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../lib/*: org.apache.rocketmq.namesrv.NamesrvStartup
  501  5994  4718   0 11:06PM ttys001    0:00.01 grep java
  501  5930  5920   0 11:04PM ttys002    0:06.76 /Library/Java/JavaVirtualMachines/jdk-23.jdk/Contents/Home/bin/java -server -Xms8g -Xmx8g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:file=/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M -XX:-OmitStackTraceInFastThrow -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=15g -XX:-UseLargePages -cp .:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../conf:/Users/hhl/Downloads/rocketmq-all-4.9.4-bin-release/bin/../lib/*: org.apache.rocketmq.broker.BrokerStartup -n localhost:9876

测试RocketMQ

export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

停止RocketMQ

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

spirngboot整合

参考项目:https://gitee.com/naseng/springboot-rocketmq.git