rocketmq3.26研究之四DefaultMQProducer

时间:2021-12-17 17:18:55
  1. DefaultMQProducer

    作为rocketmq生产者的默认实现,其实它并没有做任何实现,其内部引用一个DefaultMQProducerImpl实例进行具体消息发送。
    它有一些基础配置,比如多长时间内消息发送多少次还是没成功则放弃(默认为4秒内发送3次,每次发消息默认超时间为3秒),可以参考DefaultMQProducerImpl.sendDefaultImpl

    • 重要字段
      1 String producerGroup 生产者的组名。
      一个jvm内,具有相同producerGroup名字的生产者实例只有一个。
      DefaultMQProducerImpl.start
      2 retryAnotherBrokerWhenNotStoreOK
      消息没有存储成功是否发送到另外一个broker.
      3 sendMsgTimeout
      发送消息超时时间,默认为3秒
    • 重要方法
      send(Message msg)
      发送消息,调用DefaultMQProducerImpl.send发送
  2. DefaultMQProducerImpl

    这个类主要实现了发送消息之前的逻辑,包括判定消息发到哪,消息发送失败处理等待。

    • 重要字段:
      1 MQClientInstance
      与mq交互的实例,一个clientId只有一个MQClientInstance实例。
    • 重要方法
      1 start()
      该方法实现了producer的启动,包括以下几个方面:
      1.1 实例化MQClientInstance,并启动
      1.2 实例数量限制,一个jvm内,一个producerGroup只能有一个实例,参照如下代码:
      DefaultMQProducerImpl.start方法代码参考:
      boolean registerOK = mQClientFactory.registerProducer(
      this.defaultMQProducer.getProducerGroup(), this);
      if (!registerOK) {
      this.serviceState = ServiceState.CREATE_JUST;
      throw new MQClientException
      1.3 clientId生成规则
      clientId由本机ip@instanceName组成,instanceName如果不设置默认为DEFAULT,但是start时会通过ManagementFactory.getRuntimeMXBean转换为进程id,所以,默认的clientid为 ip@pid,但是这样会导致一个produer往不同的集群发消息时出现问题,参见rocketmq问题汇总-instanceName参数何时该设置?
      所以需要往不同的集群发消息时须设置instanceName。
      2 send(Message msg)
      发送消息,调用sendDefaultImpl
      2 sendDefaultImpl(
      Message msg,//消息对象
      CommunicationMode communicationMode,//默认同步调用
      SendCallback sendCallback, //回调,默认为null
      final long timeout//超时时间,默认3秒
      )
      该方法包含了发送前状态检查,消息检查等,重要步骤如下:
      2.1 根据topic查找topic配置信息(TopicPublishInfo),
      并缓存,参考updateTopicRouteInfoFromNameServer
      2.2 轮询选择一个MessageQueue,
      其包括了topic+brokerName+queueId,知道brokerName就可以查询broker的地址,进而发送消息。
      2.3 调用sendKernelImpl发送消息
      2.4 返回数据或处理异常,进行再次发送
      3 sendKernelImpl(
      MessageQueue mq,//主要
      包含了topic+brokerName+queueId
      Message msg,//消息对象
      CommunicationMode communicationMode,//默认同步调用
      SendCallback sendCallback, //回调,默认为null
      final long timeout//超时时间,默认3秒
      )
      该方法包含以下几个功能:
      3.1 根据brokerName获取broker地址
      3.2 压缩消息(消息体>4k进行压缩)
      3.3 调用MQClientAPIImpl发送消息
  3. MQClientInstance

    该类作为与MQ交互的实例,一般来说一个jvm只有一个,参见clientId,它包含了topic路由信息,broker地址信息等,同时负责启动通信服务和定时任务等等。

    • 重要字段
      1 Map<groupName, MQProducerInner> producerTable
      存储了producer group name和DefaultMQProducerImpl的映射。
      2 Map<BrokerName,Map<brokerId,address>>brokerAddrTable
      存储了brokerName与brokerid和地址的对应关系,brokerId:0为master,1为slave,用于根据brokerName查找master地址。
      3 Map<TopicName, TopicRouteData> topicRouteTable
      存储了topic name和TopicRouteData对象映射关系,具体参考TopicRouteData
      4 MQClientAPIImpl mQClientAPIImpl
      与broker交互的api封装。
    • 重要方法
      1 start()
      该方法包括了几个方面:
      1.1 启动MQClientAPIImpl
      1.2 启动各种定时任务:
      a 每两分钟执行一次寻址服务(NameServer地址)
      b 每30秒更新一次所有的topic的路由信息(topicRouteTable),参见updateTopicRouteInfoFromNameServer
      c 每30秒移除离线的broker,主要是参照topicRouteTable更新brokerAddrTable
      d 每30秒发送一次心跳给所有的master broker
      e 更新offset每5秒提交一次消费的offset,broker端为ConsumerOffsetManager负责记录,此offset是逻辑偏移量,比如说,consumerA@consumerAGroup 在broker_a的queue 0的消费队列共有10000条消息,目前消费到888,那么offset就是888.
      说到这里,为啥冒出个消费相关的东东,因为producer和consumer内部都持有MQClientInstance实例,故MQClientInstance既有生产者逻辑,又有消费者逻辑。
      h 每1分钟调整一次线程池,这也是针对消费者来说的,具体为如果消息堆积超过10W条,则调大线程池,最多64个线程;如果消息堆积少于8W条,则调小线程池,最少20的线程。
      1.3 启动PullMessageService,针对consumer,到讲解consumer时再做详细说明
      1.4 启动RebalanceService服务,针对consumer,到讲解consumer时再做详细说明
      1.5 启动一个groupName为CLIENT_INNER_PRODUCER的DefaultMQProducer,用于将消费失败的消息发回broker,消息的topic格式为%RETRY%ConsumerGroupName。
      2 updateTopicRouteInfoFromNameServer
      该方法可以保证producer的failover功能,参见rocketmq3.26研究之Failover下producer的表现,其主要实现步骤如下:
      2.1 从NamerServer上拉取topic的配置信息,
      参见rocketmq3.26研究之NameServer,配置信息(TopicRouteData)主要包含了以下内容:
      rocketmq3.26研究之四DefaultMQProducer
      2.2 拉取之后更新缓存topicRouteTable
      2.3 更新brokerAddrTable
      2.4 获取具有写权限的queue,
      获取角色为master的broker,
      对brokerName进行逻辑queue的划分等,即如果该broker具有写权限,则循环writeQueueNums分配queueId.
      参见topicRouteData2TopicPublishInfo。这里的逻辑queue是指创建topic时指定的writeQueueNums。这里有个疑惑的地方,如果writeQueueNums和readQueueNums不相等,会造成什么问题?经过测试(只针对集群模式),假设writeQueueNums=6,readQueueNums=3,那么每个broker上有3个queue的消息将无法消费!不知道为啥这里这么设计!
      3 sendHeartbeatToAllBroker
      发送心跳给broker master
      4 List<String> findConsumerIdList(String topic, String group)
      查找订阅topic的同一consumer group的所有消费者的cid列表。
      4.1 首先根据topic从topicRouteTable中查找broker的地址
      4.2 如果地址查不到则调用updateTopicRouteInfoFromNameServer更新相应的关系
      4.3 最后调用MQClientAPIImpl.getConsumerIdListByGroup来查找cid列表
  4. MQClientAPIImpl

    该类实现了客户端与远程交互的封装,其实内部封装了RemotingClient来实现与远程的交互。

    • 重要字段
      1 RemotingClient remotingClient
      与远程通信对象实现
      2 ClientRemotingProcessor clientRemotingProcessor
      客户端接收broker通信的实现,与consumer相关。
    • 重要方法
      1 start
      启动,主要调用NettyRemotingClient.start
      2 sendMessageSync
      发送同步消息,其实调用remotingClient
      3 sendMessageAsync
      发送异步消息,其实调用remotingClient
      4 processSendResponse
      处理返回结果
      5 List<String> getConsumerIdListByGroup(//
      final String addr, //
      final String consumerGroup, //
      final long timeoutMillis)

      根据addr和consumerGroup查找cid列表
  5. NettyRemotingClient

    基于netty实现的与远程交互的封装。这个类主要是netty的封装,俺就不在这叨叨了。

6 附一张消息发送的图,但愿能解释清楚我上面凌乱的说明吧:
rocketmq3.26研究之四DefaultMQProducer