-
DefaultMQProducer
作为rocketmq生产者的默认实现,其实它并没有做任何实现,其内部引用一个DefaultMQProducerImpl实例进行具体消息发送。
它有一些基础配置,比如多长时间内消息发送多少次还是没成功则放弃(默认为4秒内发送3次,每次发消息默认超时间为3秒),可以参考DefaultMQProducerImpl.sendDefaultImpl- 重要字段
1 String producerGroup 生产者的组名。
一个jvm内,具有相同producerGroup名字的生产者实例只有一个。
见DefaultMQProducerImpl.start
2 retryAnotherBrokerWhenNotStoreOK
消息没有存储成功是否发送到另外一个broker.
3 sendMsgTimeout
发送消息超时时间,默认为3秒 - 重要方法
send(Message msg)
发送消息,调用DefaultMQProducerImpl.send发送
- 重要字段
-
DefaultMQProducerImpl
这个类主要实现了发送消息之前的逻辑,包括判定消息发到哪,消息发送失败处理等待。
- 重要字段:
1 MQClientInstance
与mq交互的实例,一个clientId只有一个MQClientInstance实例。 - 重要方法
1 start()
该方法实现了producer的启动,包括以下几个方面:
1.1 实例化MQClientInstance,并启动。
1.2 实例数量限制,一个jvm内,一个producerGroup只能有一个实例,参照如下代码:DefaultMQProducerImpl.start方法代码参考:
1.3 clientId生成规则
boolean registerOK = mQClientFactory.registerProducer(
this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException
clientId由本机ip@instanceName组成,instanceName如果不设置默认为DEFAULT,但是start时会通过ManagementFactory.getRuntimeMXBean转换为进程id,所以,默认的clientid为 ip@pid,但是这样会导致一个produer往不同的集群发消息时出现问题,参见rocketmq问题汇总-instanceName参数何时该设置?
所以需要往不同的集群发消息时须设置instanceName。
2 send(Message msg)
发送消息,调用sendDefaultImpl
2 sendDefaultImpl(
Message msg,//消息对象
CommunicationMode communicationMode,//默认同步调用
SendCallback sendCallback, //回调,默认为null
final long timeout//超时时间,默认3秒
)
该方法包含了发送前状态检查,消息检查等,重要步骤如下:
2.1 根据topic查找topic配置信息(TopicPublishInfo),
并缓存,参考updateTopicRouteInfoFromNameServer
2.2 轮询选择一个MessageQueue,
其包括了topic+brokerName+queueId,知道brokerName就可以查询broker的地址,进而发送消息。
2.3 调用sendKernelImpl发送消息
2.4 返回数据或处理异常,进行再次发送
3 sendKernelImpl(
MessageQueue mq,//主要
包含了topic+brokerName+queueId
Message msg,//消息对象
CommunicationMode communicationMode,//默认同步调用
SendCallback sendCallback, //回调,默认为null
final long timeout//超时时间,默认3秒
)
该方法包含以下几个功能:
3.1 根据brokerName获取broker地址
3.2 压缩消息(消息体>4k进行压缩)
3.3 调用MQClientAPIImpl发送消息
- 重要字段:
-
MQClientInstance
该类作为与MQ交互的实例,一般来说一个jvm只有一个,参见clientId,它包含了topic路由信息,broker地址信息等,同时负责启动通信服务和定时任务等等。
- 重要字段
1Map<groupName, MQProducerInner> producerTable
存储了producer group name和DefaultMQProducerImpl的映射。
2Map<BrokerName,Map<brokerId,address>>brokerAddrTable
存储了brokerName与brokerid和地址的对应关系,brokerId:0为master,1为slave,用于根据brokerName查找master地址。
3Map<TopicName, TopicRouteData> topicRouteTable
存储了topic name和TopicRouteData对象映射关系,具体参考TopicRouteData
4 MQClientAPIImpl mQClientAPIImpl
与broker交互的api封装。 - 重要方法
1 start()
该方法包括了几个方面:
1.1 启动MQClientAPIImpl
1.2 启动各种定时任务:
a 每两分钟执行一次寻址服务(NameServer地址)
b 每30秒更新一次所有的topic的路由信息(topicRouteTable),参见updateTopicRouteInfoFromNameServer
c 每30秒移除离线的broker,主要是参照topicRouteTable更新brokerAddrTable
d 每30秒发送一次心跳给所有的master broker
e 更新offset每5秒提交一次消费的offset,broker端为ConsumerOffsetManager负责记录,此offset是逻辑偏移量,比如说,consumerA@consumerAGroup 在broker_a的queue 0的消费队列共有10000条消息,目前消费到888,那么offset就是888.
说到这里,为啥冒出个消费相关的东东,因为producer和consumer内部都持有MQClientInstance实例,故MQClientInstance既有生产者逻辑,又有消费者逻辑。
h 每1分钟调整一次线程池,这也是针对消费者来说的,具体为如果消息堆积超过10W条,则调大线程池,最多64个线程;如果消息堆积少于8W条,则调小线程池,最少20的线程。
1.3 启动PullMessageService,针对consumer,到讲解consumer时再做详细说明
1.4 启动RebalanceService服务,针对consumer,到讲解consumer时再做详细说明
1.5 启动一个groupName为CLIENT_INNER_PRODUCER的DefaultMQProducer,用于将消费失败的消息发回broker,消息的topic格式为%RETRY%ConsumerGroupName。
2 updateTopicRouteInfoFromNameServer
该方法可以保证producer的failover功能,参见rocketmq3.26研究之Failover下producer的表现,其主要实现步骤如下:
2.1 从NamerServer上拉取topic的配置信息,
参见rocketmq3.26研究之NameServer,配置信息(TopicRouteData)主要包含了以下内容:
2.2 拉取之后更新缓存topicRouteTable
2.3 更新brokerAddrTable
2.4 获取具有写权限的queue,
获取角色为master的broker,
对brokerName进行逻辑queue的划分等,即如果该broker具有写权限,则循环writeQueueNums分配queueId.
参见topicRouteData2TopicPublishInfo。这里的逻辑queue是指创建topic时指定的writeQueueNums。这里有个疑惑的地方,如果writeQueueNums和readQueueNums不相等,会造成什么问题?经过测试(只针对集群模式),假设writeQueueNums=6,readQueueNums=3,那么每个broker上有3个queue的消息将无法消费!不知道为啥这里这么设计!
3 sendHeartbeatToAllBroker
发送心跳给broker master
4List<String> findConsumerIdList(String topic, String group)
查找订阅topic的同一consumer group的所有消费者的cid列表。
4.1 首先根据topic从topicRouteTable中查找broker的地址
4.2 如果地址查不到则调用updateTopicRouteInfoFromNameServer更新相应的关系
4.3 最后调用MQClientAPIImpl.getConsumerIdListByGroup来查找cid列表
- 重要字段
-
MQClientAPIImpl
该类实现了客户端与远程交互的封装,其实内部封装了RemotingClient来实现与远程的交互。
- 重要字段
1 RemotingClient remotingClient
与远程通信对象实现
2 ClientRemotingProcessor clientRemotingProcessor
客户端接收broker通信的实现,与consumer相关。 - 重要方法
1 start
启动,主要调用NettyRemotingClient.start
2 sendMessageSync
发送同步消息,其实调用remotingClient
3 sendMessageAsync
发送异步消息,其实调用remotingClient
4 processSendResponse
处理返回结果
5List<String> getConsumerIdListByGroup(//
final String addr, //
final String consumerGroup, //
final long timeoutMillis)
根据addr和consumerGroup查找cid列表
- 重要字段
-
NettyRemotingClient
基于netty实现的与远程交互的封装。这个类主要是netty的封装,俺就不在这叨叨了。
6 附一张消息发送的图,但愿能解释清楚我上面凌乱的说明吧: