轻松解读RocketMQ5.3.0核心源码

时间:2024-10-25 07:15:31

文章目录

  • 一、读源码的方法
    • 1、为什么要读源码
    • 2、怎么读源码
  • 二、源码热身阶段
    • 1、NameServer的启动过程
    • 2、Broker服务启动过程
  • 三、小试牛刀阶段
    • 3、Netty服务注册框架
    • 4、Broker心跳注册管理
    • 5、Producer发送消息过程
    • 6、Consumer拉取消息过程
    • 7、客户端负载均衡管理总结
      • 1 Producer负载均衡
      • 2 Consumer负载均衡
  • 四、融汇贯通阶段
    • 8、消息持久化设计
      • 1、RocketMQ的持久化文件结构
      • 2、commitLog写入
      • 3、文件同步刷盘与异步刷盘
      • 4、CommitLog主从复制
      • 5、分发ConsumeQueue和IndexFile
      • 6、过期文件删除机制
      • 7、文件索引结构
    • 9、延迟消息机制
      • 1、关注重点
      • 2、源码重点
    • 10、长轮询机制
      • 1、功能介绍
      • 2、源码重点
  • 五、基于RocketMQ源码理解零拷贝与顺序写
    • 1、顺序写加速文件写入磁盘
    • 2、刷盘机制保证消息不丢失
    • 3、零拷贝加速文件读写
  • 六、总结

轻松解读RocketMQ5.3.0核心源码

楼兰:你的神秘技术宝藏

​ 同步刷盘,异步刷盘?同步同步,异步同步?零拷贝,顺序写?你是不是曾经在各种各样的流量短视频和博客中听说过跟RocketMQ相关的这些亮瞎眼的高大上的技术名词?你有没有想过自己去RocketMQ中看看这些概念背后的Java代码是什么样的?如果你有过这样的想法,那么,楼兰到你到RocketMQ的源码当中逛逛,看看RocketMQ的大神们,是如何用简单常见的MVC思想开发出RocketMQ这样的神作的。

本次源码解读基于RocketMQ5.3.0版本。RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/

一、读源码的方法

​ 先跟大家简单聊一聊为什么要去读RocketMQ的源码,以及怎么去读这些优秀的源码。

1、为什么要读源码

​ 开门见山的说,我见过很多程序员,但是有超过70%的程序员是不会读源码的。很多程序员天天在用SpringBoot,但是从来没有去看过SpringApplication是如何run起来的。而这一类不怎么读优秀源码的程序员有几个很明显的特点。

​ 一是解决复杂问题能力不强。大家一起开发业务代码好像每个人能力都差不多,但是当项目组真正遇到一些有挑战性的技术问题,能站出来的人往往一个项目组里不超过十分之一。且不说做一些底层的架构设计工作了,就是拿老个老项目,把pom依赖版本升下级,都能难倒很多人。这基本上就决定了程序员的发展空间有限。

​ 二是焦虑。我见过很多程序员,不管到什么公司,在什么职位,都会越来越焦虑,更不用说现在这个大环境了。其实更深层次的原因,是他们自己也清楚,自己被替换的成本太低了。没有自己的技术护城河,再好的工作履历也是镜中月水中花。

​ 这些跟读源码有什么关系呢?很明显,对于绝大多数程序员来说,多读多学习这些优秀开源中间件的源码,是必不可少的一个提升自己的方式。这就好像提升篮球运动员水平最好的方式,就是扔到最好的球队里去捶打。跟着这些开源中间件去学习,相比于自己花几年时间研究一个算法,优化一个框架来说,是这些程序员们最经济,最靠谱,最有价值的学习方式。

2、怎么读源码

​ 但是读源码是不容易的。这里面有很多原因。

​ 一是读源码需要积累。草草的看一遍源码对你几乎不会有什么帮助。远不如网上看看课程,写几个Demo,效果立竿见影。耐不住性子,是几乎任何职业想要往上提升的共同天敌。枯燥的源码更是容易把这个问题放大。

​ 二是没有能力去读懂源码中的设计思想。每个中间件要解决的问题不一样,解决问题的思路都不一样。而这些最有价值的东西,往往又被各种各样细枝末节的细节给掩盖了。这就很容易造成读源码找不到方向。

​ 三是不清楚到底要从源码中学习什么东西。源码写得再好,是别人的代码,解决的是别人的问题,对自己工作有什么帮助呢?找不到直接的方向,收集不到最关键的信息,这也就很容易造成读源码没有成就感。

​ 再转回到这一期的内容来说,就是读源码其实不适合做课程来分享,或者说跟大家一起解读源码是一件很困难的事情。对程序员来说,读源码就好比看历史。茫茫历史之路,有人读出了天下风云,有人读出了风花雪月。有人读出了不为五斗米折腰,也有人读出了厚黑学。历史是古人的,源码也是别人的。每个人的技术基础不一样,从中理解出来的东西也不一样。

​ 所以在这一次跟大家分享RocketMQ源码之前,我也想要跟大家分享一下我读源码的一些经验方法。

1、带着问题读源码

​ 每次准备去读源码之前,找一个自己感兴趣的问题入手,每次顺着这个问题去读源码。这些问题可以是使用过程中自己遇到过的一些难题,也可以就是纯粹的想要验证一下之前学到的某一个相关知识点。总之,如果没有自己的思考,源码不如不读。

2、小步快走

​ 不要希望一两遍就能读懂源码!每次读懂一点点内容,慢慢积累,才能逐渐加深对源码的理解。

​ 所以,这一次,我会分为三个阶段逐步带你去理解RocketMQ的核心源码。

  • 源码热身阶段:主要是快速理解RocketMQ的整体源码结构,梳理关键的主线。为后续深入解读打下基础。
  • 小试牛刀阶段:主要是拿一些RocketMQ中使用门槛比较低的功能机制,进行过程梳理。开始找到一些按照自己思想解读源码的感觉。
  • 融会贯通阶段:主要是分析一些RocketMQ中技术门槛比较高的核心机制。在这个过程中,逐步开始梳理出RocketMQ中的一些逻辑主线,深挖细节,形成自己的理解。

3、分步总结

​ 按照之间的步骤,找到自己的问题,带上自己的理解,机制总结。对RocketMQ中各种扩展功能,逐步尝试验证。最后形成一些自己的输出内容。

二、源码热身阶段

​ 我们知道RocketMQ的整体架构是由以下几个服务构成:

在这里插入图片描述

​ 这一部分,我们就来梳理一下NameServer和Broker有哪些核心组件,重点是详细梳理下这两个核心服务有哪些配置参数可以调整。

1、NameServer的启动过程

1、关注的问题

​ 在RocketMQ集群的服务端,实际进行消息存储、推送等核心功能点的都是Broker。而NameServer的作用,其实和微服务中的注册中心非常类似,他只是提供了Broker端的服务注册与发现功能。

​ 第一次看源码,不要太过陷入具体的细节,先搞清楚NameServer的大体结构。

2、源码重点

​ NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Cotroller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。

​ 在NameServer中,还启动了一个ControllerManager服务,这个服务主要是用来保证服务高可用的,这里暂不解读。

​ 另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。我们这最先关注的是他的整体结构:

在这里插入图片描述

解读出以下几个重点:

​ 1、这几个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?自己试试从源码中找找答案。

​ 2、在之前的4.x版本当中,Nameserver中是没有ControllerManager和NettyRemotingClient的,这意味着现在NameServer现在也需要往外发Netty请求了。

​ 3、稍微解读下Nameserver中核心组件例如RouteInfoManager的结构,可以发现RocketMQ的整体源码风格其实就是典型的MVC思想。 Controller响应网络请求,各种Manager和其中包含的Service处理业务,内存中的各种Table保存消息。

2、Broker服务启动过程

1、关注重点

​ Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。

这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。

2、源码重点

​ Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。

​ 启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动。

首先: 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:

  • BrokerConfig : Broker服务配置
  • MessageStoreConfig : 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
  • NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
  • NettyClientConfig : Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳。
  • AuthConfig:权限相关的配置。

这些配置是我们了解如何优化 RocketMQ 使用的关键。

然后: 在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的

this.messageStore.start();//启动核心的消息存储组件
this.timerMessageStore.start(); //时间轮服务,主要是处理指定时间点的延迟消息。

this.remotingServer.start(); //Netty服务端
this.fastRemotingServer.start(); //启动另一个Netty服务端。

this.brokerOuterAPI.start();//启动客户端,往外发请求

this.topicRouteInfoManager.start(); //管理Topic路由信息

BrokerController.this.registerBrokerAll: //向所有依次NameServer注册心跳。

this.brokerStatsManager.start();//服务状态

​ 我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。后面等到分析具体业务时再去深入每个服务的细节。

​ 我们需要抽象出Broker的一个整体结构:

在这里插入图片描述

​ 可以看到Broker启动了两个Netty服务,他们的功能基本差不多。实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。

三、小试牛刀阶段

​ 开始理解一些比较简单的业务逻辑

3、Netty服务注册框架

1、关注重点

RocketMQ实际上是一个复杂的分布式系统, NameServer,Broker,Client之间需要有大量跨进程的RPC调用。这些复杂的RPC请求是怎么管理,怎么调用的呢?这是我们去理解RocketMQ底层业务的基础。这一部分的重点就是去梳理RocketMQ的这一整套基于Netty的远程调用框架。

需要说明的是,RocketMQ整个服务调用框架绝大部分是使用Netty框架封装的。所以,要看懂这部分代码,需要你对Netty框架有足够的了解。

2、源码重点

Netty的所有远程通信功能都由remoting模块实现。remoting模块中有两个对象最为重要。 就是RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,涉及到的远程服务非常多,同一个服务,可能既是RPC的服务端也可以是RPC的客户端。例如Broker服务,对于Client来说,他需要作为服务端响应他们发送消息以及拉取消息等请求,所以Broker是需要RemotingServer的。而另一方面,Broker需要主动向NameServer发送心跳请求,这时,Broker又需要RemotingClient。因此,Broker既是RPC的服务端又是RPC的客户端。

对于这部分的源码,就可以从remoting模块中RemotingServer和RemotingClient的初始化过程入手。有以下几个重点是需要梳理清楚的:

1、RemotingServer和RemotingClient之间是通过什么协议通讯的?

RocketMQ中,RemotingServer是一个接口,在这个接口下,提供了两个具体的实现类,NettyRemotingServer和MultiProtocolRemotingServer。他们都是基于Netty框架封装的,只不过处理数据的协议不一样。也就是说,RocketMQ可以基于不同协议实现RPC访问。其实这也就为RocketMQ提供多种不同语言的客户端打下了基础。

2、哪些组件需要Netty服务端?哪些组件需要Netty客户端?

之间简单梳理过,NameServer和Broker的服务内部都是既有RemotingServer和RemotingClient的。 那么作为客户端的Producer和Consumer,是不是就只需要RemotingClient呢?其实也不是,事务消息的Producer也需要响应Broker的事务状态回查,他也是需要NettyServer的。

这里需要注意的是,Netty框架是基于Channel长连接发起的RPC通信。只要长连接建立了,那么数据发送是双向的。也就是说,Channel长连接建立完成后,NettyServer服务端也可以向NettyClient客户端发送请求,所以服务端和客户端都需要对业务进行处理。

3、Netty框架最核心的部分是如何构架处理链,RocketMQ是如何构建的呢?

服务端构建处理链的核心代码:

// org.apache.rocketmq.remoting.netty.NettyRemotingServer
protected ChannelPipeline configChannel(SocketChannel ch) {
   
        return ch.pipeline()
            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
            .addLast(defaultEventExecutorGroup,
                encoder, //请求编码器
                new NettyDecoder(), //请求解码器
                distributionHandler, //请求计数器
                new IdleStateHandler(0, 0,
                    nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //心跳管理器
                connectionManageHandler, //连接管理器
                serverHandler //核心的业务处理器
            );
    }

我们这里主要分析业务请求如何管理。分两个部分来看:

1、请求参数:

从请求的编解码器可以看出,RocketMQ的所有RPC请求数据都封装成RemotingCommand对象。RemotingCommand对象中有几个重要的属性:

private int code; //响应码,表示请求处理成功还是失败
private int opaque = requestId.getAndIncrement(); //服务端内部会构建唯一的请求ID。
private transient CommandCustomHeader customHeader; //自定义的请求头。用来区分不同的业务请求
private transient byte[] body; //请求参数体
private int flag = 0; //参数类型, 默认0表示请求,1表示响应

2、处理逻辑

所有核心的业务请求都是通过一个NettyServerHandler进行统一处理。他处理时的核心代码如下:

@ChannelHandler.Sharable
    public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
   
		//统一处理所有业务请求
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
   
            int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
            NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
            if (localPort != -1 && remotingAbstract != null) {
   
                remotingAbstract.processMessageReceived(ctx, msg); //核心处理请求的方法
                return;
            }
            // The related remoting server has been shutdown, so close the connected channel
            RemotingHelper.closeChannel(ctx.channel());
        }
		
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
   
           //调整channel的读写属性
        }
    }

​ **2.1、在最核心的处理请求的processMessageReceived方法中,会将请求类型分为 REQUEST__COMMAND 和 RESPONSE_COMMAND来处理。**为什么会有两种不同类型的请求呢?

这是因为客户端的业务请求会有两种类型:

​ 一种是同步请求方式:即客户端发过来业务请求,服务端同步给出请求的响应结果。这种请求方式实现简单,但是在服务端处理业务的过程中,客户端会产生阻塞,因此比较适合那些相对比较简单,服务端处理速度快的业务。

​ 另一种是异步请求方式:即客户端发过来业务请求后,服务端并不是立即给出相应结果,而是先给客户端一个请求接收成功的响应,后台异步进行处理。而客户端可以过一段时间,再发一个response类型的请求,获取上一次请求的响应。

2.2、如何处理request类型的请求?

服务端和客户端都会维护一个processorTable。这是个HashMap,key是服务码,也就对应RemotingCommand的code。value是对应的运行单元 Pair<NettyRequestProcessor, ExecutorService>。包含了执行线程的线程池和具体处理业务的Processor。 而这些Processor,是由业务系统自行注册的。

也就是说,想要看每个服务具体有哪些业务能力,就只要看他们注册了哪些Processor就知道了

Broker服务注册,详见 BrokerController.registerProcssor()方法。

NameServer的服务注册方法,重点如下:

private void registerProcessor() {
   
        if (namesrvConfig.isClusterTest()) {
    //是否测试集群模式,默认是false。也就是说现在阶段不推荐。

            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);
        } else {
   
            // Support get route info only temporarily
            ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);

            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
        }
    }

另外,NettyClient也会注册一个大的ClientRemotingProcessor,统一处理所有请求。注册方法见 org.apache.rocketmq.client.impl.MQClientAPIImpl类的构造方法。也就是说,只要长连接建立完成了,NettyClient比如Producer,也可以处理NettyServer发过来的请求。

2.3、如何处理response类型的请求?

NettyServer处理完request请求后,会先缓存到responseTable中,等NettyClient下次发送response类型的请求,再来获取。这样就不用阻塞Channel,提升请求的吞吐量。优雅的支持了异步请求。

2.4、关于RocketMQ的同步结果推送与异步结果推送

RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture> 。

​ 处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。

实际上,同步也是通过异步实现的。

//org.apache.rocketmq.remoting.netty.ResponseFuture
	//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果。
    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
   
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }
	//等待异步获取到消息后,再通过countDownLatch释放当前线程。
    public void putResponse(final RemotingCommand responseCommand) {
   
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }

​ 处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。

​ 另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

//org.apache.rocketmq.remoting.netty.NettyRemotingServer
TimerTask timerScanResponseTable = new TimerTask() {
   
            @Override
            public void run(Timeout timeout) {
   
                try {
   
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
   
                    log.error("scanResponseTable exception", e);
                } finally {
   
                    timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
                }
            }
        };
        this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);

整体RPC框架流程如下图:

在这里插入图片描述

可以看到,RocketMQ基于Netty框架实现的这一套基于服务码的服务注册机制,即可以让各种不同的组件都按照自己的需求注册自己的服务方法,又可以以一种统一的方式同时支持同步请求和异步请求。所以这一套框架,其实是非常简洁易用的。在使用Netty框架进行相关应用开发时,都可以借鉴他的这一套服务注册机制。例如开发一个大型的IM项目,要添加好友、发送文本、发送图片、发送附件、甚至还有表情、红包等等各种各样的请求。这些请求如何封装,就可以参考这一套服务注册框架。

4、Broker心跳注册管理

1、关注重点

把RocketMQ的服务调用框架整理清楚之后,接下来就可以从一些具体的业务线来进行详细梳理了。

​ 之前介绍过,Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。这一轮就重点梳理这个过程。

2、源码重点

​ Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。

		//K4 Broker向NameServer进行心跳注册
        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
   
            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
            this.registerBrokerAll(true, false, true);
        }
        //启动后定时注册
        scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
   
            @Override
            public void run0() {
   
                try {
   
                    if (System.currentTimeMillis() < shouldStartTime) {
   
                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                        return;
                    }
                    if (isIsolated) {
   
                        BrokerController.LOG.info("Skip register for broker is isolated");
                        return;
                    }
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
   
                    BrokerController.LOG.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

​ NameServer内部会通过RouteInfoManager组件及时维护Broker信息。具体参见org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager的registerBroker方法

同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法,往下跟踪到startScheduleService方法。。

在这里插入图片描述

3、极简化的服务注册发现流程

​ 为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?

​ 首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。

​ 其次,更为重要的还是RocketMQ对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。如果可能,你可以与Nacos或Zookeeper的核心流程做下对比。NameServer集群只要有一个节点存活。整个集群就能正常提供服务,而Zookeeper,Nacos等都是基于多数派同意的机制,需要集群中超过半数的节点存活才能正常提供服务。

​ 但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为通用的注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得Broker列表后,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表。

5、Producer发送消息过程

1、关注重点

首先:回顾下我们之前的Producer使用案例。

Producer有两种:

  • 一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。
  • 另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。

事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。我们只关注DefaultMQProducer的消息发送过程。

然后:整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。

那我们重点关注以下几个问题:

1、Producer启动过程中启动了哪些服务。也就是了解RocketMQ的Client客户端的基础结构。

2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?希望你先从源码中进行猜想,然后自己设计实验进行验证。

3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。

2、源码重点

1、Producer的核心启动流程

​ 所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。

这个mQClientFactory是最为重要的一个对象,负责生产所有的Client,包括Producer和Consumer。

​ 这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable

在这里插入图片描述

2、发送消息的核心流程

核心流程如下:

在这里插入图片描述

​ 1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在生产者示例中,start后打一个断点,然后把NameServer停掉,这时,Producer还是可以发送消息的。

​ 2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。

	//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
	//QueueFilter是用来过滤掉上一次失败的Broker的,表