1、环境搭建
准备:
rocketmq-all-4.3.0-bin-release.zip
必须配置:JAVA_HOME=/home/rui/jdk1.8.0_172
1 unzip rocketmq-all-4.3.0-bin-release.zip -d /home/rui/ 2 3 cd /home/rui/ 4 5 mv rocketmq-all-4.3.0-bin-release/ rocketmq-4.3.0 6 7 vim /etc/profile 8 9 export PATH=$PATH:/home/rui/rocketmq-4.3.0/bin 10 11 source /ect/profile
启动rocketmq,构建简单的producer-consumer model(PCM)
1)启动Name Server
1 cd /home/rui/rocketmq-4.3.0/bin 2 3 sh mqnamesrv &45 ps -ef|grep mqnamesrv
出现提示“There is insufficient memory for the Java Runtime Environment to continue.”,kafka也出现一样的问题,当时是在命令行临时指定了jvm的配置。
解决:修改bin/runserver.sh,runbroker.sh,tools.sh的JVM配置
vim runserver.sh
vim runbroker.sh,第二个注释的jvm配置还是不对劲,配置成第三个jvm参数能正常启动broker
vim tools.sh
重新启动:
2)启动broker
1 cd /home/rui/rocketmq-4.3.0/bin 2 3 sh ./mqbroker -n localhost:9876 & 4 5 ps -ef|grep mqbroker
note:
注意broker的地址!!!下面有用到它来创建一个TopicTest
3)发送和接收消息
在启动生产者时出现异常:no route info of this topic,TopicTest
解决:
使用命令mqadmin updateTopic手动创建一个TopicTest
1 ./mqadmin updateTopic -b 192.168.0.102:10911 -p 6 -t TopicTest
在同一个xshell界面下,先启动producer,再启动consumer
1 export NAMESRV_ADDR=localhost:9876 2 3 sh./tools.sh org.apache.rocketmq.example.quickstart.Producer 4 5 sh ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
producer
consumer
4)关闭broker,关闭name server
1 sh ./mqshutdown broker 2 3 sh ./mqshutdown namesrv
2、基本概念
集群 参考:http://rocketmq.apache.org/docs/rmq-arc/
1)NameServer cluster:提供服务发现和路由。每个名称服务器记录完整的路由信息,提供相应的读写服务,并支持快速存储扩展。
2)broker cluster:提供TOPIC和QUEUE机制来处理消息存储。它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),并提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。Brokers还提供灾难恢复,丰富的指标统计和警报机制。
3)producer cluster:生产者支持分布式部署。 Distributed Producers通过多种负载均衡模式向Broker集群发送消息。发送过程支持快速故障并具有低延迟。
4)consumer cluster:消费者也支持Push和Pull模型中的分布式部署。它还支持群集消费和消息广播。它提供实时消息订阅机制。
两个Server 参考:http://rocketmq.apache.org/docs/rmq-arc/
1、NameServer:Broker管理,NameServer接收来自Broker集群的注册,并且提供心跳机制来检查某个broker是否存活。路由管理,每个NameServer拥有整个broker集群的路由信息和客户端查询的队列信息。
2、Broker Server:Broker Server负责消息的存储和分发,消息的查询,高可用保证等等。Broker Server有几个重要的组成部分:Remoting Module,Client Manage,Store Service,HA Service,Index Service。
1)Remoting Module:broker的入口,负责处理来自客户端的请求。
2)Client Manager:管理客户端(Producer/Consumer),维持消费者的topic订阅。
3)Store Service:提供简单的API存储消息到物理磁盘或者从物理磁盘查询消息。
4)HA Service:提供主从broker数据同步的特性。
5)Index Service:通过指定特定的键为消息建立索引,以便快速查询消息。
核心概念,参考:http://rocketmq.apache.org/docs/core-concept/
Topic:生产者(Producer)传递消息和消费者(Consumer)提取消息的类别。它跟生产者和消费者之间的关系是松耦合的。
Tag:Topic的子类别。它能标识来自同一业务模块的具有不同目的的消息。
pullconsumer:pull类型的consumer主动从broker获取消息。一旦获取到批量的消息,用户应用将会启动消费进程。
pushconsumer:push类型的consumer内部封装了消息获取,消费进度并且维护其他内部工作。留有一个回调接口,它在消息到达时被执行。
producer group:具有相同角色的producer的集合。在事务方面,当某个producer实例崩溃了,producer组中的任一个producer实例去连接broker实现事务的提交或者回滚。
consumer group:具有相同角色的consumer的集合。可以用于实现消息消费方面的负载平衡和容错。每个consumer实例必须有相同的topic订阅。
mssage model:两种:一种是clusting,一种是broadcasting。
message order:两种:一种是orderly,一种是concurrently。当使用了DefaultMQPushConsumer这个类时,可以指定消费消息的顺序。
3、架构,图片来源:http://rocketmq.apache.org/docs/rmq-arc/
4、支持的协议和规范
Pull model, support TCP, JMS, OpenMessaging
5、admin 命令行操作
1 ./mqadmin topicList -n 192.168.0.102:9876 #列出对应name server所有topic 2 3 ./mqadmin topicClusterList -n 192.168.0.102:9876 -t TopicTest #查询某topic对应的cluster,默认是DefaultCluster 4 5 ./mqadmin deleteTopic -c DefaultCluster -n 192.168.0.102:9876 -t TopicTest #通过指定cluster删除某个topic67 ./mqadmin printMsg -n 192.168.0.102:9876 -t TopicTest #打印某个topic的消息细节8 9 ./mqadmin consumeMessage -n 192.168.0.102:9876 -t TopicTest #消费指定topic的消息
6、java操作
maven依赖
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>2.6.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <version>2.6.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> <!--<version>4.3.0</version>--> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-broker</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-store</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-namesrv</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-logging</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-test</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-srvutil</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-filter</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-example</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.42.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.20.0-GA</version> </dependency> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>4.2.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency> <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> <version>0.3.1-alpha</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.7</version> </dependency> </dependencies>
1)simple:参考:http://rocketmq.apache.org/docs/simple-example/
可以使用三种方式生产消息,分别是sync,async和one-way。使用到的关键类:org.apache.rocketmq.client.producer.DefaultMQProducer;
1 DefaultMQProducer producer = new DefaultMQProducer("test"); // test是组名,必须唯一 2 3 producer.setVipChannelEnabled(false); //vip_port=109094 5 producer.setNamesrvAddr("localhost:9876"); //producer -> nameserver 6 7 producer.start();8
三种方法关键的实现代码分别如下:
1 //sync 2 SendResult sendResult = producer.send(msg); 3 4 //async 5 producer.send(msg, new SendCallback() { 6 @Override 7 public void onSuccess(SendResult sendResult) { } 8 @Override 9 public void onException(Throwable e) { } 10 }); 11 12 //one-way 13 producer.sendOneway(msg);14
注意:最后关闭producer。
消费消息:使用到的关键类:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test"); 2 3 consumer.setVipChannelEnabled(false); //vip_port=10909 4 3 consumer.setNamesrvAddr("localhost:9876"); // consumer -> nameserver 4 5 consumer.subscribe("TopicTest", "*"); // 订阅 TopicTest 6 7 consumer.registerMessageListener(new MessageListenerConcurrently() { // 指定顺序concurrently 8 9 @Override 10 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 11 12 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 13 } 14 });
注意:最后关闭consumer。
2)order
3)broadcast
广播模式的关键代码:在BroadcastConsumer.java加入设置消息模式为BROADCASTING
1 //set to broadcast mode 2 consumer.setMessageModel(MessageModel.BROADCASTING);
4)schedule
调度模式的关键代码:在ScheduledMessageProducer.java设置延迟时间级别为3
1 // This message will be delivered to consumer 10 seconds later. 2 message.setDelayTimeLevel(3);
5)batch
如果发送的消息不超过1MiB,使用List装载Message。Mesage的topic相同。
如果复杂性高,不确定超过1MiB,通过实现接口 Iterator<List<Message>>以分割List<Message>
6)filter
可以使用sql表达式来选择消费的消息。支持数字比较,字符比较,逻辑运算,判断null等。
1 consumer.subscribe("TopicTest", MessageSelector.bySql("..."));
7)openmessage
8)transaction
生产消息,用到的关键类:org.apache.rocketmq.client.producer.TransactionMQProducer
生产者注册事务监听器:org.apache.rocketmq.client.producer.TransactionListener
生产者发送消息:TransactionMQProducer#sendMessageInTransaction
7、与spring整合
applicationContext-producer.xml
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" > <property name="producerGroup" value="producer1"/> <property name="namesrvAddr" value="192.168.0.102:9876"/> </bean> </beans>
applicationContext-consumer.xml
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <bean id="myMessageListener" class="com.rui.spring.MyMessageListener" /> <bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"> <property name="consumerGroup" value="consumer1"/> <property name="namesrvAddr" value="192.168.0.102:9876"/> <property name="messageListener" ref="myMessageListener"/> <property name="subscription"> <map> <entry key="TopicTest"> <value>*</value> </entry> </map> </property> </bean> </beans>
MyMessageListener.java
package com.rui.spring; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class MyMessageListener implements MessageListenerConcurrently { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
9、源码(java客户端)
10、集群