RocketMQ
介绍与概念
在github上的说法来看: Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。它提供了多种功能:
- 发布/订阅消息模型和点对点
- 预定的消息传递
- 消息追溯性按时间或偏移量
- 记录流媒体的中心
- 大数据集成
- 可靠的FIFO和严格的有序消息传递在同一队列中
- 高效的推拉消费模式
- 单个队列中的百万级消息累积容量
- 多种消息传递协议,如JMS和OpenMessaging
- 灵活的分布式横向扩展部署架构
- Lightning-fast批处理消息交换系统
- 各种消息过滤器机制,如SQL和Tag
- Docker图像用于隔离测试和云隔离集群
- 功能丰富的管理仪表板,用于配置,指标和监控
- 访问控制列表
- 消息跟踪
上面都是官方列举出来了,还有如下特点:
- 支持Broker和Consumer端消息过滤
- 支持拉pull和推push两种消费模式,也就是上面说的推拉消费模式
- 支持单master节点,多master节点,多master节点多slave节点
- 消息失败重试机制,支持特定level的定时消息
- 新版本底层采用Netty
官方网站: http://rocketmq.apache.org/docs/motivation/ 也对RocketMQ和ActiveMQ以及Kafka做了一个对比.
核心概念
参考: http://rocketmq.apache.org/docs/core-concept/ 本人英文阅读能力有点弱,大家请看官方文档.
- Producer: 消息生产者
- Producer Group: 消息生产者组,发送同类消息的一个消息生产组
- Consumer: 消息消费者
- Consumer Group: 消费同类消息的多个实例
- Tag: 标签.子主题对topic的进一步细化,用于区分同一个主题下的不同业务的消息.
- Topic: 主题,queue是消息的物理管理单位,而topic是逻辑管理单位,一个topic下可以有多个queue,默认自动创建是4个,手动创建是8个.
- Message: 消息,每个Message必须指定一个topic
- Broker: MQ程序,接受生产的消息,提供给消费者消费的程序
- Name Server: 给生产和消费者提供路由信息,提供轻量级的服务发现、路由、元数据信息,可以多个部署,互相独立(比zookeeper更轻量)
- Offset: 偏移量,可以理解为消息进度
- commit log: 消息存储会写在Commit log文件里面
云服务器安装RocketMQ
安装RocketMQ
- 下载: http://rocketmq.apache.org/docs/quick-start/, 直接下载源代码版本
- 上传到云服务器
- 开始操作(参考: http://rocketmq.apache.org/docs/quick-start/):
1. 安装unzip命令: yum install unzip
2. 解压: unzip rocketmq-all-4.4.0-source-release.zip
3. 重命名: mv rocketmq-all-4.4.0 rocketmq
4. 进入文件夹: cd rocketmq
5. 使用maven进行源码编译: mvn -Prelease-all -DskipTests clean install -U
6. 进入文件夹: cd distribution/target/apache-rocketmq
7. 启动服务: sh bin/mqnamesrv (注意如果内存小于4G可能会失败)
- 启动失败的原因与解决:
- 启动失败的原因与解决:
[root@iZwz94sw188z3yfl7lpmmsZ apache-rocketmq]# sh bin/mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew
young collector with the CMS collector is deprecated and will
likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning:
UseCMSCompactAtFullCollection is deprecated and will likely be
removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x00000006ec800000, 2147483648, 0) failed;
error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment
to continue.
# Native memory allocation (mmap) failed to map 2147483648 bytes
for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/software/rocketmq-all-
4.4.0/distribution/target/apache-rocketmq/hs_err_pid8993.log
这种原因是内存不足导致的,默认是4G,解决办法是编辑 bin/runserver.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- 以守护进程的方式启动:
nohup sh bin/mqnamesrv & 这个命令可能会nohup: ignoring input and appending output to ‘nohup.out’,如果出现这个,执行:
tail -f nohup.out
这个时候ctrl+c就不会退出程序,仅仅是让程序后台运行
- 启动Broker
[root@wangzhi apache-rocketmq]# sh bin/mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/software/rocketmq/distribution/target/apache-rocketmq/hs_err_pid3012.log
出现了Broker内存不足的问题:
vim bin/runbroker.sh 修改 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" 的内容为:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
完成之后以守护进程的方式进行启动:
nohup sh bin/mqbroker -n localhost:9876 &
tail -f nohup.out
这个时候可以使用jps查看进程,并且可以根据进程号来kill进程
- 进行测试:
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 远程连接需要开发9876和10911端口,在生产环境不要开放
源码安装RcoketMQ4.X控制台
- 上传源码包
- 解压: unzip rocketmq-externals-master.zip
- 进入文件夹: cd rocketmq-externals-master
- 开始操作:
先修改两个BUG,一个是修改pom.xml的版本号.
cd rocketmq-console/
vim pom.xml : 将里面的rocketmq的版本号后面的snashop删掉,只留个4.4.0就好
在上一个console的路径下继续:
cd src/main/resources/
vim application.properties : 修改nameserver地址 rocketmq.config.namesrvAddr=127.0.0.1:9876
返回到console目录,进行编译安装:
mvn clean package -Dmaven.test.skip
- 启动:
cd target
java -jar rocketmq-console-ng-1.0.0.jar
使用ctrl + c关闭程序,以守护进程的方式进行启动
nohup java -jar rocketmq-console-ng-1.0.0.jar &
tail -f nohup.out