RocketMQ入门案例

时间:2022-09-26 00:08:59

  学习RocketMQ,先写一个Demo演示一下看看效果。

一、服务端部署

  因为只是简单的为了演示效果,服务端仅部署单Master模式 —— 一个Name Server节点,一个Broker节点。主要有以下步骤。

    1. 下载RocketMQ源码、编译(也可以网上下载编译好的文件),这里使用最新的4.4.0版本,下载好之后放在Linux上通过一下命令解压缩、编译。
      unzip rocketmq-all-4.4.-source-release.zip
      cd rocketmq-all-4.4./
      mvn -Prelease-all -DskipTests clean install –U
    2. 编译之后到distribution/target/apache-rocketmq目录,后续所有操作都是在该路径下。
      cd distribution/target/apache-rocketmq
    3. 启动Name Server,查看日志确认启动成功。
      nohup sh bin/mqnamesrv &
      tail -f ~/logs/rocketmqlogs/namesrv.log
    4. 启动Broker,查看日志确认启动成功。
      nohup sh bin/mqbroker -n localhost: &
      tail -f ~/logs/rocketmqlogs/broker.log

  Name Server和Broker都成功启动,服务器就部署完成了。更详细的参考官方文档手册,里面还包含在服务器上运行Producer、Customer示例,这里主要是在项目中使用。

  官网手册戳这里:Quick Start

二、客户端搭建:Spring Boot项目中使用

  客户端分为消息生产者和消息消费者,这里通过日志打印输出查看效果,为了看起来更清晰,我新建了两个模块分别作为消息生产者和消息消费者。

  1. 添加依赖,在两个模块的pom文件中添加以下配置。
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
    </dependency>
  2. 配置生产者模块。
    • application.yml文件中增加用来初始化producer的相关配置,这里只配了一部分,更详细的配置参数可以查看官方文档。
      # RocketMQ生产者
      rocketmq:
      producer:
      # Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER
      producerGroup: ${spring.application.name}
      # namesrv地址
      namesrvAddr: 192.168.101.213:9876
      # 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB
      maxMessageSize: 4096
      # 发送消息超时时间,单位毫秒。默认10000
      sendMsgTimeout: 5000
      # 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2
      retryTimesWhenSendFailed: 2
      # 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096
      compressMsgBodyOverHowmuch: 4096
      # 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
      createTopicKey: XIAO_LIU
    • 新增producer配置类,系统启动时读取yml文件的配置信息初始化producer。集群模式下,如果在同一个jvm中,要往多个的MQ集群发送消息,则需要创建多个的producer并设置不同的instanceName,默认不需要设置该参数。
      @Configuration
      public class ProducerConfiguration {
      private static final Logger LOGGER = LoggerFactory.getLogger(ProducerConfiguration.class); /**
      * Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER
      */
      @Value("${rocketmq.producer.producerGroup}")
      private String producerGroup;
      /**
      * namesrv地址
      */
      @Value("${rocketmq.producer.namesrvAddr}")
      private String namesrvAddr;
      /**
      * 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB
      */
      @Value("${rocketmq.producer.maxMessageSize}")
      private Integer maxMessageSize;
      /**
      * 发送消息超时时间,单位毫秒。默认10000
      */
      @Value("${rocketmq.producer.sendMsgTimeout}")
      private Integer sendMsgTimeout;
      /**
      * 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2
      */
      @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
      private Integer retryTimesWhenSendFailed;
      /**
      * 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096
      */
      @Value("${rocketmq.producer.compressMsgBodyOverHowmuch}")
      private Integer compressMsgBodyOverHowmuch;
      /**
      * 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
      */
      @Value("${rocketmq.producer.createTopicKey}")
      private String createTopicKey; @Bean
      public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(this.producerGroup);
      producer.setNamesrvAddr(this.namesrvAddr);
      producer.setCreateTopicKey(this.createTopicKey); if (this.maxMessageSize != null) {
      producer.setMaxMessageSize(this.maxMessageSize);
      }
      if (this.sendMsgTimeout != null) {
      producer.setSendMsgTimeout(this.sendMsgTimeout);
      }
      if (this.retryTimesWhenSendFailed != null) {
      producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
      }
      if (this.compressMsgBodyOverHowmuch != null) {
      producer.setCompressMsgBodyOverHowmuch(this.compressMsgBodyOverHowmuch);
      }
      if (Strings.isNotBlank(this.createTopicKey)) {
      producer.setCreateTopicKey(this.createTopicKey);
      } try {
      producer.start(); LOGGER.info("Producer Started : producerGroup:[{}], namesrvAddr:[{}]"
      , this.producerGroup, this.namesrvAddr);
      } catch (MQClientException e) {
      LOGGER.error("Producer Start Failed : {}", e.getMessage(), e);
      }
      return producer;
      } }
    • 使用producer实例向MQ发送消息。
      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class ProducerServiceApplicationTests {
      private static final Logger LOGGER = LoggerFactory.getLogger(ProducerServiceApplicationTests.class);
      @Autowired
      private DefaultMQProducer defaultMQProducer; @Test
      public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
      for (int i = 0; i < 100; i++) {
      User user = new User();
      user.setUsername("用户" + i);
      user.setPassword("密码" + i);
      user.setSex(i % 2);
      user.setBirthday(new Date());
      Message message = new Message("user-topic", "user-tag", JSON.toJSONString(user).getBytes(RemotingHelper.DEFAULT_CHARSET));
      SendResult sendResult = defaultMQProducer.send(message);
      LOGGER.info(sendResult.toString());
      }
      }
      }
  3. 配置消费者模块。
    • application.yml文件中增加用来初始化consumer的相关配置,同样参数这里只配了一部分,更详细的配置参数可以查看官方文档。
      # RocketMQ消费者
      rocketmq:
      consumer:
      # Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组。默认DEFAULT_CONSUMER
      consumerGroup: ${spring.application.name}
      # namesrv地址
      namesrvAddr: 192.168.101.213:9876
      # 消费线程池最大线程数。默认10
      consumeThreadMin: 10
      # 消费线程池最大线程数。默认20
      consumeThreadMax: 20
      # 批量消费,一次消费多少条消息。默认1
      consumeMessageBatchMaxSize: 1
      # 批量拉消息,一次最多拉多少条。默认32
      pullBatchSize: 32
      # 订阅的主题
      topics: user-topic
    • 新增consumer配置。
      @Configuration
      public class ConsumerConfiguration {
      private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerConfiguration.class); @Value("${rocketmq.consumer.consumerGroup}")
      private String consumerGroup;
      @Value("${rocketmq.consumer.namesrvAddr}")
      private String namesrvAddr;
      @Value("${rocketmq.consumer.consumeThreadMin}")
      private int consumeThreadMin;
      @Value("${rocketmq.consumer.consumeThreadMax}")
      private int consumeThreadMax;
      @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
      private int consumeMessageBatchMaxSize;
      @Value("${rocketmq.consumer.pullBatchSize}")
      private int pullBatchSize;
      @Value("${rocketmq.consumer.topics}")
      private String topics; private final ConsumeMsgListener consumeMsgListener; @Autowired
      public ConsumerConfiguration(final ConsumeMsgListener consumeMsgListener) {
      this.consumeMsgListener = consumeMsgListener;
      } @Bean
      public DefaultMQPushConsumer getRocketMQConsumer() {
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
      consumer.setNamesrvAddr(namesrvAddr);
      consumer.setConsumeThreadMin(consumeThreadMin);
      consumer.setConsumeThreadMax(consumeThreadMax);
      consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
      consumer.setPullBatchSize(pullBatchSize);
      consumer.registerMessageListener(consumeMsgListener); try {
      /**
      * 设置消费者订阅的主题和tag。subExpression参数为*表示订阅该主题下所有tag,
      * 如果需要订阅该主题下的指定tag,subExpression设置为对应tag名称,多个tag以||分割,例如"tag1 || tag2 || tag3"
      */
      consumer.subscribe(topics, "*");
      consumer.start(); LOGGER.info("Consumer Started : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr);
      } catch (Exception e) {
      LOGGER.error("Consumer Start Failed : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr, e);
      e.printStackTrace();
      }
      return consumer;
      }
      }
    • 新增消息监听器,监听到新消息后,执行对应的业务逻辑。

      @Component
      public class ConsumeMsgListener implements MessageListenerConcurrently {
      private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeMsgListener.class); @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      if (CollectionUtils.isEmpty(msgs)) {
      LOGGER.info("Msgs is Empty.");
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
      for (MessageExt msg : msgs) {
      try {
      if ("user-topic".equals(msg.getTopic())) {
      LOGGER.info("{} Receive New Messages: {}", Thread.currentThread().getName(), new String(msg.getBody()));
      // do something
      }
      } catch (Exception e) {
      if (msg.getReconsumeTimes() == 3) {
      // 超过3次不再重试
      LOGGER.error("Msg Consume Failed.");
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      } else {
      // 重试
      return ConsumeConcurrentlyStatus.RECONSUME_LATER;
      }
      }
      } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
      }
三、效果
  1. 运行生产者测试代码。系统启动时初始化Producer,然后执行测试代码,往MQ中发送消息。效果如下:
    RocketMQ入门案例
  2. 启动消费者服务。系统启动时先初始化Customer。此时1.已经往MQ中发送了一些消息,监听器监听到MQ中有消息,随即马上消费消息。
    RocketMQ入门案例
四、总结

  Demo很简单,但是里面还有很多东西需要慢慢研究。

  代码可以戳这里:spring-cloud-learn

RocketMQ入门案例的更多相关文章

  1. RocketMQ入门到入土(二)事务消息&amp&semi;顺序消息

    接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战! 一.事务消息的由来 1.案例 引用官方的购物案例: 小明购买一个100元的东西,账户扣款100元的同时需要保证在下游的积分系统给小 ...

  2. SpringMVC入门案例及请求流程图(关于处理器或视图解析器或处理器映射器等的初步配置)

    SpringMVC简介:SpringMVC也叫Spring Web mvc,属于表现层的框架.Spring MVC是Spring框架的一部分,是在Spring3.0后发布的 Spring结构图 Spr ...

  3. SpringMvc核心流程以及入门案例的搭建

    1.什么是SpringMvc Spring MVC属于SpringFrameWork的后续产品,已经融合在Spring Web Flow里面.Spring 框架提供了构建 Web 应用程序的全功能 M ...

  4. Struts2第一个入门案例

      一.如何获取Struts2,以及Struts2资源包的目录结构的了解    Struts的官方地址为http://struts.apache.org 在他的主页当中,我们可以通过左侧的Apache ...

  5. MyBatis入门案例、增删改查

    一.MyBatis入门案例: ①:引入jar包 ②:创建实体类 Dept,并进行封装 ③ 在Src下创建大配置mybatis-config.xml <?xml version="1.0 ...

  6. Hibernate入门案例及增删改查

    一.Hibernate入门案例剖析: ①创建实体类Student 并重写toString方法 public class Student { private Integer sid; private I ...

  7. Quartz应用实践入门案例二&lpar;基于java工程&rpar;

    在web应用程序中添加定时任务,Quartz的简单介绍可以参看博文<Quartz应用实践入门案例一(基于Web应用)> .其实一旦学会了如何应用开源框架就应该很容易将这中框架应用与自己的任 ...

  8. Quartz应用实践入门案例一(基于Web环境)

    Quartz是一个完全由java编写的开源作业调度框架,正是因为这个框架整合了许多额外的功能,所以在使用上就显得相当容易.只是需要简单的配置一下就能轻松的使用任务调度了.在Quartz中,真正执行的j ...

  9. MyBatis入门案例 增删改查

    一.MyBatis入门案例: ①:引入jar包 ②:创建实体类 Dept,并进行封装 ③ 在Src下创建大配置mybatis-config.xml <?xml version="1.0 ...

随机推荐

  1. C语言pow函数编写

    C语言pow函数编写 #include<stdio.h> double chaoba(double f,double q); //声明自定义函数 void main(void) { dou ...

  2. Spark&lpar;一&rpar;&colon; 基本架构及原理

    Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和St ...

  3. Android 动画学习笔记

    Android动画的两种:Frame帧动画.Tween动画(位移动画)[实现:存放目录res/anim] Tween动画:(位移.缩放.旋转):通过对场景里的对象不断做图像变换. 四种效果Alpha. ...

  4. jquery中动态新增的元素节点无法触发事件解决办法

    在使用jquery中动态新增的元素节点时会发现添加的事件是无法触发的,我们下面就为各位来详细的介绍此问题的解决办法. 比如做一个ajax读取留言列表的时候,每条留言后面有个回复按钮,class为“re ...

  5. Vue之Vuex

    一.什么是vuex Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式.它采用集中式存储管理应用的所有组件的状态,并以相应的规则保证状态以一种可预测的方式发生变化.简单来说就是一个数据统一 ...

  6. DDD实战进阶第一波&lpar;九&rpar;:开发一般业务的大健康行业直销系统(实现经销商上下文仓储与领域逻辑)

    上篇文章主要讲述了经销商上下文的需求与POCO对象,这篇文章主要讲述该界限上下文的仓储与领域逻辑的实现. 关于界限上下文与EF Core数据访问上下文参考产品上下文相应的实现,这里不再累述. 因为在经 ...

  7. Responsive响应式设计

    在IE6-8中完全是不支持CSS3 Media Queries的.那么为了让IE6-8支持,我们就很有必要的在IE9以下的浏览器中加上media-queries.js或者respond.js脚本: & ...

  8. 【做题】codechefCOUNTARI——分块FFT

    记本题数组长度为\(n\),权值大小为\(m\). 首先,暴力显然是\(O(n^2)\)的. 先瞄一眼tag,然后发现这是FFT. 显然,问题的关键在于要满足i,j,k之间的位置关系.于是考虑分治FF ...

  9. Object-C-block

    块是对c语言的一种扩展语法 块看起来像函数,不同的是,快可以直接写在函数内部 块能够作为参数传递给函数或者方法 void sayHello(){NSLog(@"hello!");} ...

  10. bzoj千题计划135:bzoj1066&colon; &lbrack;SCOI2007&rsqb;蜥蜴

    http://www.lydsy.com/JudgeOnline/problem.php?id=1066 每个柱子拆成两个点 i<<1,i<<1|1,之间连流量为高度的边 如果 ...