rocketmq ----> 学习笔记

时间:2021-08-27 18:26:56

官网:http://rocketmq.apache.org

1、环境搭建

准备:

rocketmq-all-4.3.0-bin-release.zip

必须配置:JAVA_HOME=/home/rui/jdk1.8.0_172

 1 unzip rocketmq-all-4.3.0-bin-release.zip -d  /home/rui/
 2
 3 cd  /home/rui/
 4
 5 mv rocketmq-all-4.3.0-bin-release/ rocketmq-4.3.0
 6
 7 vim /etc/profile
 8
 9 export PATH=$PATH:/home/rui/rocketmq-4.3.0/bin
10
11 source /ect/profile

启动rocketmq,构建简单的producer-consumer model(PCM)

1)启动Name Server

1 cd /home/rui/rocketmq-4.3.0/bin
2
3 sh mqnamesrv &45 ps -ef|grep mqnamesrv

出现提示“There is insufficient memory for the Java Runtime Environment to continue.”,kafka也出现一样的问题,当时是在命令行临时指定了jvm的配置。

解决:修改bin/runserver.sh,runbroker.sh,tools.sh的JVM配置

vim runserver.sh

rocketmq ----> 学习笔记

vim runbroker.sh,第二个注释的jvm配置还是不对劲,配置成第三个jvm参数能正常启动broker

rocketmq ----> 学习笔记

vim tools.sh

rocketmq ----> 学习笔记

重新启动:

rocketmq ----> 学习笔记

rocketmq ----> 学习笔记

2)启动broker

1 cd /home/rui/rocketmq-4.3.0/bin
2
3 sh ./mqbroker -n localhost:9876 &
4
5 ps -ef|grep mqbroker 

rocketmq ----> 学习笔记

note:

注意broker的地址!!!下面有用到它来创建一个TopicTest

3)发送和接收消息

在启动生产者时出现异常:no route info of this topic,TopicTest

rocketmq ----> 学习笔记

解决:

使用命令mqadmin updateTopic手动创建一个TopicTest

1 ./mqadmin updateTopic -b 192.168.0.102:10911 -p 6 -t TopicTest

rocketmq ----> 学习笔记

在同一个xshell界面下,先启动producer,再启动consumer

1 export NAMESRV_ADDR=localhost:9876
2
3 sh./tools.sh org.apache.rocketmq.example.quickstart.Producer
4
5 sh ./tools.sh org.apache.rocketmq.example.quickstart.Consumer

producer

rocketmq ----> 学习笔记

consumer

rocketmq ----> 学习笔记

4)关闭broker,关闭name server

1 sh ./mqshutdown broker
2
3 sh ./mqshutdown namesrv

rocketmq ----> 学习笔记

rocketmq ----> 学习笔记

2、基本概念

集群 参考:http://rocketmq.apache.org/docs/rmq-arc/

1)NameServer cluster:提供服务发现和路由。每个名称服务器记录完整的路由信息​​,提供相应的读写服务,并支持快速存储扩展。

2)broker  cluster:提供TOPIC和QUEUE机制来处理消息存储。它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),并提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。Brokers还提供灾难恢复,丰富的指标统计和警报机制。

3)producer cluster:生产者支持分布式部署。 Distributed Producers通过多种负载均衡模式向Broker集群发送消息。发送过程支持快速故障并具有低延迟。

4)consumer cluster:消费者也支持Push和Pull模型中的分布式部署。它还支持群集消费和消息广播。它提供实时消息订阅机制。

两个Server 参考:http://rocketmq.apache.org/docs/rmq-arc/

1、NameServer:Broker管理,NameServer接收来自Broker集群的注册,并且提供心跳机制来检查某个broker是否存活。路由管理,每个NameServer拥有整个broker集群的路由信息和客户端查询的队列信息。

2、Broker Server:Broker Server负责消息的存储和分发,消息的查询,高可用保证等等。Broker Server有几个重要的组成部分:Remoting Module,Client Manage,Store Service,HA Service,Index Service。

1)Remoting Module:broker的入口,负责处理来自客户端的请求。

2)Client Manager:管理客户端(Producer/Consumer),维持消费者的topic订阅。

3)Store Service:提供简单的API存储消息到物理磁盘或者从物理磁盘查询消息。

4)HA Service:提供主从broker数据同步的特性。

5)Index Service:通过指定特定的键为消息建立索引,以便快速查询消息。

核心概念,参考:http://rocketmq.apache.org/docs/core-concept/

Topic:生产者(Producer)传递消息和消费者(Consumer)提取消息的类别。它跟生产者和消费者之间的关系是松耦合的。

Tag:Topic的子类别。它能标识来自同一业务模块的具有不同目的的消息。

pullconsumer:pull类型的consumer主动从broker获取消息。一旦获取到批量的消息,用户应用将会启动消费进程。

pushconsumer:push类型的consumer内部封装了消息获取,消费进度并且维护其他内部工作。留有一个回调接口,它在消息到达时被执行。

producer group:具有相同角色的producer的集合。在事务方面,当某个producer实例崩溃了,producer组中的任一个producer实例去连接broker实现事务的提交或者回滚。

consumer group:具有相同角色的consumer的集合。可以用于实现消息消费方面的负载平衡和容错。每个consumer实例必须有相同的topic订阅。

mssage model:两种:一种是clusting,一种是broadcasting。

message order:两种:一种是orderly,一种是concurrently。当使用了DefaultMQPushConsumer这个类时,可以指定消费消息的顺序。

3、架构,图片来源:http://rocketmq.apache.org/docs/rmq-arc/

rocketmq ----> 学习笔记

4、支持的协议和规范

Pull model, support TCP, JMS, OpenMessaging

5、admin 命令行操作

1 ./mqadmin topicList -n 192.168.0.102:9876 #列出对应name server所有topic
2
3 ./mqadmin topicClusterList -n 192.168.0.102:9876 -t TopicTest #查询某topic对应的cluster,默认是DefaultCluster
4
5 ./mqadmin deleteTopic -c DefaultCluster -n 192.168.0.102:9876 -t TopicTest #通过指定cluster删除某个topic67 ./mqadmin printMsg -n 192.168.0.102:9876 -t TopicTest #打印某个topic的消息细节8 9 ./mqadmin consumeMessage -n 192.168.0.102:9876 -t TopicTest #消费指定topic的消息

6、java操作

maven依赖

 <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.11</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
             <version>2.6.0</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <version>2.6.3</version>
             <scope>test</scope>
         </dependency>

             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-client</artifactId>
                 <version>4.3.0</version>
                 <!--<version>4.3.0</version>-->
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-broker</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-common</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-store</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-namesrv</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-tools</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-remoting</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-logging</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-test</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-srvutil</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-filter</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-example</artifactId>
                 <version>4.3.0</version>
             </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>1.7.7</version>
             </dependency>
             <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
                 <version>1.0.13</version>
             </dependency>
             <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-core</artifactId>
                 <version>1.0.13</version>
             </dependency>
             <dependency>
                 <groupId>commons-cli</groupId>
                 <artifactId>commons-cli</artifactId>
                 <version>1.2</version>
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>
                 <version>4.0.42.Final</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>fastjson</artifactId>
                 <version>1.2.29</version>
             </dependency>
             <dependency>
                 <groupId>org.javassist</groupId>
                 <artifactId>javassist</artifactId>
                 <version>3.20.0-GA</version>
             </dependency>
             <dependency>
                 <groupId>net.java.dev.jna</groupId>
                 <artifactId>jna</artifactId>
                 <version>4.2.2</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
                 <version>3.4</version>
             </dependency>
             <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>19.0</version>
             </dependency>
             <dependency>
                 <groupId>io.openmessaging</groupId>
                 <artifactId>openmessaging-api</artifactId>
                 <version>0.3.1-alpha</version>
             </dependency>
             <dependency>
                 <groupId>log4j</groupId>
                 <artifactId>log4j</artifactId>
                 <version>1.2.17</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-core</artifactId>
                 <version>2.7</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-slf4j-impl</artifactId>
                 <version>2.7</version>
             </dependency>
         </dependencies>

rocketmq ----> 学习笔记

1)simple:参考:http://rocketmq.apache.org/docs/simple-example/

可以使用三种方式生产消息,分别是sync,async和one-way。使用到的关键类:org.apache.rocketmq.client.producer.DefaultMQProducer;

1  DefaultMQProducer producer = new DefaultMQProducer("test"); // test是组名,必须唯一
2  3  producer.setVipChannelEnabled(false);  //vip_port=109094
5  producer.setNamesrvAddr("localhost:9876"); //producer -> nameserver
6
7  producer.start();8 

三种方法关键的实现代码分别如下:

 1 //sync
 2 SendResult sendResult = producer.send(msg);
 3
 4 //async
 5  producer.send(msg, new SendCallback() {
 6                     @Override
 7                     public void onSuccess(SendResult sendResult) { }
 8                     @Override
 9                     public void onException(Throwable e) { }
10                 });
11
12 //one-way
13 producer.sendOneway(msg);14

注意:最后关闭producer。

消费消息:使用到的关键类:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

 1  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
 2   3  consumer.setVipChannelEnabled(false); //vip_port=10909 4
 3  consumer.setNamesrvAddr("localhost:9876"); // consumer -> nameserver
 4
 5  consumer.subscribe("TopicTest", "*"); // 订阅 TopicTest
 6
 7  consumer.registerMessageListener(new MessageListenerConcurrently() { // 指定顺序concurrently
 8
 9             @Override
10             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
11
12                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
13             }
14         });

注意:最后关闭consumer。

2)order

3)broadcast

广播模式的关键代码:在BroadcastConsumer.java加入设置消息模式为BROADCASTING

1 //set to broadcast mode
2 consumer.setMessageModel(MessageModel.BROADCASTING);

4)schedule

调度模式的关键代码:在ScheduledMessageProducer.java设置延迟时间级别为3

1 // This message will be delivered to consumer 10 seconds later.
2 message.setDelayTimeLevel(3);

5)batch

如果发送的消息不超过1MiB,使用List装载Message。Mesage的topic相同。

如果复杂性高,不确定超过1MiB,通过实现接口 Iterator<List<Message>>以分割List<Message>

6)filter

可以使用sql表达式来选择消费的消息。支持数字比较,字符比较,逻辑运算,判断null等。

1 consumer.subscribe("TopicTest", MessageSelector.bySql("..."));

7)openmessage

8)transaction

生产消息,用到的关键类:org.apache.rocketmq.client.producer.TransactionMQProducer

生产者注册事务监听器:org.apache.rocketmq.client.producer.TransactionListener

生产者发送消息:TransactionMQProducer#sendMessageInTransaction

7、与spring整合

applicationContext-producer.xml

 <?xml version="1.0" encoding="utf-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:aop="http://www.springframework.org/schema/aop"
 xmlns:tx="http://www.springframework.org/schema/tx"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/aop
         http://www.springframework.org/schema/aop/spring-aop.xsd
         http://www.springframework.org/schema/tx
         http://www.springframework.org/schema/tx/spring-tx.xsd">

     <bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" >
          <property name="producerGroup" value="producer1"/>
          <property name="namesrvAddr" value="192.168.0.102:9876"/>
      </bean>
 </beans>

applicationContext-consumer.xml

 <?xml version="1.0" encoding="utf-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:aop="http://www.springframework.org/schema/aop"
 xmlns:tx="http://www.springframework.org/schema/tx"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/aop
         http://www.springframework.org/schema/aop/spring-aop.xsd
         http://www.springframework.org/schema/tx
         http://www.springframework.org/schema/tx/spring-tx.xsd">

      <bean id="myMessageListener" class="com.rui.spring.MyMessageListener" />

      <bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer">
             <property name="consumerGroup" value="consumer1"/>
             <property name="namesrvAddr" value="192.168.0.102:9876"/>
             <property name="messageListener" ref="myMessageListener"/>
             <property name="subscription">
                 <map>
                     <entry key="TopicTest">
                         <value>*</value>
                     </entry>
                 </map>
             </property>
      </bean>
 </beans>

MyMessageListener.java

 package com.rui.spring;

 import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;

 public class MyMessageListener implements MessageListenerConcurrently {
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 }

9、源码(java客户端)

10、集群