文章目录
- 一、读源码的方法
-
- 1、为什么要读源码
- 2、怎么读源码
- 二、源码热身阶段
-
- 1、NameServer的启动过程
- 2、Broker服务启动过程
- 三、小试牛刀阶段
-
- 3、Netty服务注册框架
- 4、Broker心跳注册管理
- 5、Producer发送消息过程
- 6、Consumer拉取消息过程
- 7、客户端负载均衡管理总结
-
- 1 Producer负载均衡
- 2 Consumer负载均衡
- 四、融汇贯通阶段
-
- 8、消息持久化设计
-
- 1、RocketMQ的持久化文件结构
- 2、commitLog写入
- 3、文件同步刷盘与异步刷盘
- 4、CommitLog主从复制
- 5、分发ConsumeQueue和IndexFile
- 6、过期文件删除机制
- 7、文件索引结构
- 9、延迟消息机制
-
- 1、关注重点
- 2、源码重点
- 10、长轮询机制
-
- 1、功能介绍
- 2、源码重点
- 五、基于RocketMQ源码理解零拷贝与顺序写
-
- 1、顺序写加速文件写入磁盘
- 2、刷盘机制保证消息不丢失
- 3、零拷贝加速文件读写
- 六、总结
轻松解读RocketMQ5.3.0核心源码
楼兰:你的神秘技术宝藏 同步刷盘,异步刷盘?同步同步,异步同步?零拷贝,顺序写?你是不是曾经在各种各样的流量短视频和博客中听说过跟RocketMQ相关的这些亮瞎眼的高大上的技术名词?你有没有想过自己去RocketMQ中看看这些概念背后的Java代码是什么样的?如果你有过这样的想法,那么,楼兰到你到RocketMQ的源码当中逛逛,看看RocketMQ的大神们,是如何用简单常见的MVC思想开发出RocketMQ这样的神作的。
本次源码解读基于RocketMQ5.3.0版本。RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/
一、读源码的方法
先跟大家简单聊一聊为什么要去读RocketMQ的源码,以及怎么去读这些优秀的源码。
1、为什么要读源码
开门见山的说,我见过很多程序员,但是有超过70%的程序员是不会读源码的。很多程序员天天在用SpringBoot,但是从来没有去看过SpringApplication是如何run起来的。而这一类不怎么读优秀源码的程序员有几个很明显的特点。
一是解决复杂问题能力不强。大家一起开发业务代码好像每个人能力都差不多,但是当项目组真正遇到一些有挑战性的技术问题,能站出来的人往往一个项目组里不超过十分之一。且不说做一些底层的架构设计工作了,就是拿老个老项目,把pom依赖版本升下级,都能难倒很多人。这基本上就决定了程序员的发展空间有限。
二是焦虑。我见过很多程序员,不管到什么公司,在什么职位,都会越来越焦虑,更不用说现在这个大环境了。其实更深层次的原因,是他们自己也清楚,自己被替换的成本太低了。没有自己的技术护城河,再好的工作履历也是镜中月水中花。
这些跟读源码有什么关系呢?很明显,对于绝大多数程序员来说,多读多学习这些优秀开源中间件的源码,是必不可少的一个提升自己的方式。这就好像提升篮球运动员水平最好的方式,就是扔到最好的球队里去捶打。跟着这些开源中间件去学习,相比于自己花几年时间研究一个算法,优化一个框架来说,是这些程序员们最经济,最靠谱,最有价值的学习方式。
2、怎么读源码
但是读源码是不容易的。这里面有很多原因。
一是读源码需要积累。草草的看一遍源码对你几乎不会有什么帮助。远不如网上看看课程,写几个Demo,效果立竿见影。耐不住性子,是几乎任何职业想要往上提升的共同天敌。枯燥的源码更是容易把这个问题放大。
二是没有能力去读懂源码中的设计思想。每个中间件要解决的问题不一样,解决问题的思路都不一样。而这些最有价值的东西,往往又被各种各样细枝末节的细节给掩盖了。这就很容易造成读源码找不到方向。
三是不清楚到底要从源码中学习什么东西。源码写得再好,是别人的代码,解决的是别人的问题,对自己工作有什么帮助呢?找不到直接的方向,收集不到最关键的信息,这也就很容易造成读源码没有成就感。
再转回到这一期的内容来说,就是读源码其实不适合做课程来分享,或者说跟大家一起解读源码是一件很困难的事情。对程序员来说,读源码就好比看历史。茫茫历史之路,有人读出了天下风云,有人读出了风花雪月。有人读出了不为五斗米折腰,也有人读出了厚黑学。历史是古人的,源码也是别人的。每个人的技术基础不一样,从中理解出来的东西也不一样。
所以在这一次跟大家分享RocketMQ源码之前,我也想要跟大家分享一下我读源码的一些经验方法。
1、带着问题读源码
每次准备去读源码之前,找一个自己感兴趣的问题入手,每次顺着这个问题去读源码。这些问题可以是使用过程中自己遇到过的一些难题,也可以就是纯粹的想要验证一下之前学到的某一个相关知识点。总之,如果没有自己的思考,源码不如不读。
2、小步快走
不要希望一两遍就能读懂源码!每次读懂一点点内容,慢慢积累,才能逐渐加深对源码的理解。
所以,这一次,我会分为三个阶段逐步带你去理解RocketMQ的核心源码。
- 源码热身阶段:主要是快速理解RocketMQ的整体源码结构,梳理关键的主线。为后续深入解读打下基础。
- 小试牛刀阶段:主要是拿一些RocketMQ中使用门槛比较低的功能机制,进行过程梳理。开始找到一些按照自己思想解读源码的感觉。
- 融会贯通阶段:主要是分析一些RocketMQ中技术门槛比较高的核心机制。在这个过程中,逐步开始梳理出RocketMQ中的一些逻辑主线,深挖细节,形成自己的理解。
3、分步总结
按照之间的步骤,找到自己的问题,带上自己的理解,机制总结。对RocketMQ中各种扩展功能,逐步尝试验证。最后形成一些自己的输出内容。
二、源码热身阶段
我们知道RocketMQ的整体架构是由以下几个服务构成:
这一部分,我们就来梳理一下NameServer和Broker有哪些核心组件,重点是详细梳理下这两个核心服务有哪些配置参数可以调整。
1、NameServer的启动过程
1、关注的问题
在RocketMQ集群的服务端,实际进行消息存储、推送等核心功能点的都是Broker。而NameServer的作用,其实和微服务中的注册中心非常类似,他只是提供了Broker端的服务注册与发现功能。
第一次看源码,不要太过陷入具体的细节,先搞清楚NameServer的大体结构。
2、源码重点
NameServer的启动入口类是org.apache.rocketmq.namesrv.NamesrvStartup。其中的核心是构建并启动一个NamesrvController。这个Cotroller对象就跟MVC中的Controller是很类似的,都是响应客户端的请求。只不过,他响应的是基于Netty的客户端请求。
在NameServer中,还启动了一个ControllerManager服务,这个服务主要是用来保证服务高可用的,这里暂不解读。
另外,他的实际启动过程,其实可以配合NameServer的启动脚本进行更深入的理解。我们这最先关注的是他的整体结构:
解读出以下几个重点:
1、这几个配置类就可以用来指导如何优化Nameserver的配置。比如,如何调整nameserver的端口?自己试试从源码中找找答案。
2、在之前的4.x版本当中,Nameserver中是没有ControllerManager和NettyRemotingClient的,这意味着现在NameServer现在也需要往外发Netty请求了。
3、稍微解读下Nameserver中核心组件例如RouteInfoManager的结构,可以发现RocketMQ的整体源码风格其实就是典型的MVC思想。 Controller响应网络请求,各种Manager和其中包含的Service处理业务,内存中的各种Table保存消息。
2、Broker服务启动过程
1、关注重点
Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。
这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。
2、源码重点
Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。
启动过程关键点:重点也是围绕一个BrokerController对象,先创建,然后再启动。
首先: 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
- BrokerConfig : Broker服务配置
- MessageStoreConfig : 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置
- NettyServerConfig :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
- NettyClientConfig : Broker既要作为Netty服务端,向客户端提供核心业务能力,又要作为Netty客户端,向NameServer注册心跳。
- AuthConfig:权限相关的配置。
这些配置是我们了解如何优化 RocketMQ 使用的关键。
然后: 在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的
this.messageStore.start();//启动核心的消息存储组件
this.timerMessageStore.start(); //时间轮服务,主要是处理指定时间点的延迟消息。
this.remotingServer.start(); //Netty服务端
this.fastRemotingServer.start(); //启动另一个Netty服务端。
this.brokerOuterAPI.start();//启动客户端,往外发请求
this.topicRouteInfoManager.start(); //管理Topic路由信息
BrokerController.this.registerBrokerAll: //向所有依次NameServer注册心跳。
this.brokerStatsManager.start();//服务状态
我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。后面等到分析具体业务时再去深入每个服务的细节。
我们需要抽象出Broker的一个整体结构:
可以看到Broker启动了两个Netty服务,他们的功能基本差不多。实际上,在应用中,可以通过producer.setSendMessageWithVIPChannel(true),让少量比较重要的producer走VIP的通道。而在消费者端,也可以通过consumer.setVipChannelEnabled(true),让消费者支持VIP通道的数据。
三、小试牛刀阶段
开始理解一些比较简单的业务逻辑
3、Netty服务注册框架
1、关注重点
RocketMQ实际上是一个复杂的分布式系统, NameServer,Broker,Client之间需要有大量跨进程的RPC调用。这些复杂的RPC请求是怎么管理,怎么调用的呢?这是我们去理解RocketMQ底层业务的基础。这一部分的重点就是去梳理RocketMQ的这一整套基于Netty的远程调用框架。
需要说明的是,RocketMQ整个服务调用框架绝大部分是使用Netty框架封装的。所以,要看懂这部分代码,需要你对Netty框架有足够的了解。
2、源码重点
Netty的所有远程通信功能都由remoting模块实现。remoting模块中有两个对象最为重要。 就是RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,涉及到的远程服务非常多,同一个服务,可能既是RPC的服务端也可以是RPC的客户端。例如Broker服务,对于Client来说,他需要作为服务端响应他们发送消息以及拉取消息等请求,所以Broker是需要RemotingServer的。而另一方面,Broker需要主动向NameServer发送心跳请求,这时,Broker又需要RemotingClient。因此,Broker既是RPC的服务端又是RPC的客户端。
对于这部分的源码,就可以从remoting模块中RemotingServer和RemotingClient的初始化过程入手。有以下几个重点是需要梳理清楚的:
1、RemotingServer和RemotingClient之间是通过什么协议通讯的?
RocketMQ中,RemotingServer是一个接口,在这个接口下,提供了两个具体的实现类,NettyRemotingServer和MultiProtocolRemotingServer。他们都是基于Netty框架封装的,只不过处理数据的协议不一样。也就是说,RocketMQ可以基于不同协议实现RPC访问。其实这也就为RocketMQ提供多种不同语言的客户端打下了基础。
2、哪些组件需要Netty服务端?哪些组件需要Netty客户端?
之间简单梳理过,NameServer和Broker的服务内部都是既有RemotingServer和RemotingClient的。 那么作为客户端的Producer和Consumer,是不是就只需要RemotingClient呢?其实也不是,事务消息的Producer也需要响应Broker的事务状态回查,他也是需要NettyServer的。
这里需要注意的是,Netty框架是基于Channel长连接发起的RPC通信。只要长连接建立了,那么数据发送是双向的。也就是说,Channel长连接建立完成后,NettyServer服务端也可以向NettyClient客户端发送请求,所以服务端和客户端都需要对业务进行处理。
3、Netty框架最核心的部分是如何构架处理链,RocketMQ是如何构建的呢?
服务端构建处理链的核心代码:
// org.apache.rocketmq.remoting.netty.NettyRemotingServer
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
.addLast(defaultEventExecutorGroup,
encoder, //请求编码器
new NettyDecoder(), //请求解码器
distributionHandler, //请求计数器
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //心跳管理器
connectionManageHandler, //连接管理器
serverHandler //核心的业务处理器
);
}
我们这里主要分析业务请求如何管理。分两个部分来看:
1、请求参数:
从请求的编解码器可以看出,RocketMQ的所有RPC请求数据都封装成RemotingCommand对象。RemotingCommand对象中有几个重要的属性:
private int code; //响应码,表示请求处理成功还是失败
private int opaque = requestId.getAndIncrement(); //服务端内部会构建唯一的请求ID。
private transient CommandCustomHeader customHeader; //自定义的请求头。用来区分不同的业务请求
private transient byte[] body; //请求参数体
private int flag = 0; //参数类型, 默认0表示请求,1表示响应
2、处理逻辑
所有核心的业务请求都是通过一个NettyServerHandler进行统一处理。他处理时的核心代码如下:
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
//统一处理所有业务请求
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
if (localPort != -1 && remotingAbstract != null) {
remotingAbstract.processMessageReceived(ctx, msg); //核心处理请求的方法
return;
}
// The related remoting server has been shutdown, so close the connected channel
RemotingHelper.closeChannel(ctx.channel());
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
//调整channel的读写属性
}
}
**2.1、在最核心的处理请求的processMessageReceived方法中,会将请求类型分为 REQUEST__COMMAND 和 RESPONSE_COMMAND来处理。**为什么会有两种不同类型的请求呢?
这是因为客户端的业务请求会有两种类型:
一种是同步请求方式:即客户端发过来业务请求,服务端同步给出请求的响应结果。这种请求方式实现简单,但是在服务端处理业务的过程中,客户端会产生阻塞,因此比较适合那些相对比较简单,服务端处理速度快的业务。
另一种是异步请求方式:即客户端发过来业务请求后,服务端并不是立即给出相应结果,而是先给客户端一个请求接收成功的响应,后台异步进行处理。而客户端可以过一段时间,再发一个response类型的请求,获取上一次请求的响应。
2.2、如何处理request类型的请求?
服务端和客户端都会维护一个processorTable。这是个HashMap,key是服务码,也就对应RemotingCommand的code。value是对应的运行单元 Pair<NettyRequestProcessor, ExecutorService>。包含了执行线程的线程池和具体处理业务的Processor。 而这些Processor,是由业务系统自行注册的。
也就是说,想要看每个服务具体有哪些业务能力,就只要看他们注册了哪些Processor就知道了。
Broker服务注册,详见 BrokerController.registerProcssor()方法。
NameServer的服务注册方法,重点如下:
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
//是否测试集群模式,默认是false。也就是说现在阶段不推荐。
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);
} else {
// Support get route info only temporarily
ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
}
}
另外,NettyClient也会注册一个大的ClientRemotingProcessor,统一处理所有请求。注册方法见 org.apache.rocketmq.client.impl.MQClientAPIImpl类的构造方法。也就是说,只要长连接建立完成了,NettyClient比如Producer,也可以处理NettyServer发过来的请求。
2.3、如何处理response类型的请求?
NettyServer处理完request请求后,会先缓存到responseTable中,等NettyClient下次发送response类型的请求,再来获取。这样就不用阻塞Channel,提升请求的吞吐量。优雅的支持了异步请求。
2.4、关于RocketMQ的同步结果推送与异步结果推送
RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture> 。
处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存入responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,立即从responseTable中移除请求记录。
实际上,同步也是通过异步实现的。
//org.apache.rocketmq.remoting.netty.ResponseFuture
//发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果。
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
//等待异步获取到消息后,再通过countDownLatch释放当前线程。
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存入responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。
另外,在RemotingServer启动时,会启动一个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。
//org.apache.rocketmq.remoting.netty.NettyRemotingServer
TimerTask timerScanResponseTable = new TimerTask() {
@Override
public void run(Timeout timeout) {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
} finally {
timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
}
}
};
this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);
整体RPC框架流程如下图:
可以看到,RocketMQ基于Netty框架实现的这一套基于服务码的服务注册机制,即可以让各种不同的组件都按照自己的需求注册自己的服务方法,又可以以一种统一的方式同时支持同步请求和异步请求。所以这一套框架,其实是非常简洁易用的。在使用Netty框架进行相关应用开发时,都可以借鉴他的这一套服务注册机制。例如开发一个大型的IM项目,要添加好友、发送文本、发送图片、发送附件、甚至还有表情、红包等等各种各样的请求。这些请求如何封装,就可以参考这一套服务注册框架。
4、Broker心跳注册管理
1、关注重点
把RocketMQ的服务调用框架整理清楚之后,接下来就可以从一些具体的业务线来进行详细梳理了。
之前介绍过,Broker会在启动时向所有NameServer注册自己的服务信息,并且会定时往NameServer发送心跳信息。而NameServer会维护Broker的路由列表,并对路由表进行实时更新。这一轮就重点梳理这个过程。
2、源码重点
Broker启动后会立即发起向NameServer注册心跳。方法入口:BrokerController.this.registerBrokerAll。 然后启动一个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送心跳。
//K4 Broker向NameServer进行心跳注册
if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
this.registerBrokerAll(true, false, true);
}
//启动后定时注册
scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
@Override
public void run0() {
try {
if (System.currentTimeMillis() < shouldStartTime) {
BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
return;
}
if (isIsolated) {
BrokerController.LOG.info("Skip register for broker is isolated");
return;
}
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
BrokerController.LOG.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
NameServer内部会通过RouteInfoManager组件及时维护Broker信息。具体参见org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager的registerBroker方法
同时在NameServer启动时,会启动定时任务,扫描不活动的Broker。方法入口:NamesrvController.initialize方法,往下跟踪到startScheduleService方法。。
3、极简化的服务注册发现流程
为什么RocketMQ要自己实现一个NameServer,而不用Zookeeper、Nacos这样现成的注册中心?
首先,依赖外部组件会对产品的独立性形成侵入,不利于自己的版本演进。Kafka要抛弃Zookeeper就是一个先例。
其次,更为重要的还是RocketMQ对业务的合理设计。NameServer之间不进行信息同步,而是依赖Broker端向所有NameServer同时发起注册。这让NameServer的服务可以非常轻量。如果可能,你可以与Nacos或Zookeeper的核心流程做下对比。NameServer集群只要有一个节点存活。整个集群就能正常提供服务,而Zookeeper,Nacos等都是基于多数派同意的机制,需要集群中超过半数的节点存活才能正常提供服务。
但是,要知道,这种极简的设计,其实是以牺牲数据一致性为代价的。Broker往多个NameServer同时发起注册,有可能部分NameServer注册成功,而部分NameServer注册失败了。这样,多个NameServer之间的数据是不一致的。作为通用的注册中心,这是不可接受的。但是对于RocketMQ,这又变得可以接受了。因为客户端从NameServer上获得Broker列表后,只要有一个正常运行的Broker就可以了,并不需要完整的Broker列表。
5、Producer发送消息过程
1、关注重点
首先:回顾下我们之前的Producer使用案例。
Producer有两种:
- 一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。
- 另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。
事务消息机制后面将结合Broker进行整理分析。这一步暂不关注。我们只关注DefaultMQProducer的消息发送过程。
然后:整个Producer的使用流程,大致分为两个步骤:一是调用start方法,进行一大堆的准备工作。 二是各种send方法,进行消息发送。
那我们重点关注以下几个问题:
1、Producer启动过程中启动了哪些服务。也就是了解RocketMQ的Client客户端的基础结构。
2、Producer如何管理broker路由信息。 可以设想一下,如果Producer启动了之后,NameServer挂了,那么Producer还能不能发送消息?希望你先从源码中进行猜想,然后自己设计实验进行验证。
3、关于Producer的负载均衡。也就是Producer到底将消息发到哪个MessageQueue中。这里可以结合顺序消息机制来理解一下。消息中那个莫名奇妙的MessageSelector到底是如何工作的。
2、源码重点
1、Producer的核心启动流程
所有Producer的启动过程,最终都会调用到DefaultMQProducerImpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。
这个mQClientFactory是最为重要的一个对象,负责生产所有的Client,包括Producer和Consumer。
这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable
2、发送消息的核心流程
核心流程如下:
1、发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在生产者示例中,start后打一个断点,然后把NameServer停掉,这时,Producer还是可以发送消息的。
2、生产者如何找MessageQueue: 默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。
//org.apache.rocketmq.client.impl.producer.TopicPublishInfo
//QueueFilter是用来过滤掉上一次失败的Broker的,表