consumer 1.启动
有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。
Consumer消费拉取的消息的方式有两种
1. Push方式:rocketmq已经提供了很全面的实现,consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可 2. Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现
下面介绍默认以push方式为主,因为绝大多数是由push消费方式来使用rocketmq的。
consumer启动流程
指定group 订阅topic 注册消息监听处理器,当消息到来时消费消息 消费端Start 复制订阅关系 初始化rebalance变量 构建offsetStore消费进度存储对象 启动消费消息服务 向mqClientFactory注册本消费者 启动client端远程通信 启动定时任务 定时获取nameserver地址 定时从nameserver获取topic路由信息 定时清理下线的borker 定时向所有broker发送心跳信息,(包括订阅关系) 定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker) 统计信息打点 动态调整消费线程池 启动拉消息服务PullMessageService 启动消费端负载均衡服务RebalanceService 从namesrv更新topic路由信息 向所有broker发送心跳信息,(包括订阅关系) 唤醒Rebalance服务线程
consumer 2.消费端负载均衡
消费端负载均衡
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载
消费端遍历自己的所有topic,依次调rebalanceByTopic 根据topic获取此topic下的所有queue 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息) 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法,获取队列集合Set<MessageQueue>mqSet
1) 平均分配算法,其实是类似于分页的算法 将所有queue排好序类似于记录 将所有消费端consumer排好序,相当于页数 然后获取当前consumer所在页面应该分配到的queue 2) 按照配置来分配队列, 也就是说在consumer启动的时候指定了queue 3) 按照机房来配置队列 Consumer启动的时候会指定在哪些机房的消息 获取指定机房的queue 然后在执行如1)平均算法 根据分配队列的结果更新ProccessQueueTable<MessageQueue,ProcessQueue> 1) 比对mqSet 将多余的队列删除,当broker当机或者添加,会导致分配到mqSet变化, a) 将不在被本consumer消费的messagequeue的ProcessQueue删除,其实是设置ProcessQueue的droped属性为true b) 将超过两份中没有拉取动作ProcessQueue删除 //TODO 为什么要删除掉,两分钟后来了消息怎么办? // 2) 添加新增队列,比对mqSet,给新增的messagequeue 构建长轮询对象PullRequest对象,会从broker获取消费的进度 构建这个队列的ProcessQueue 将PullRequest对象派发到长轮询拉消息服务(单线程异步拉取) 注:ProcessQueue正在被消费的队列, (1) 长轮询拉取到消息都会先存储到ProcessQueue的TreeMap<Long, MessageExt>集合中,消费调后会删除掉,用来控制consumer消息堆积, TreeMap<Long, MessageExt> key是消息在此ConsumeQueue队列中索引 (2) 对于顺序消息消费处理 locked属性:当consumer端向broker申请锁队列成功后设置true,只有被锁定的processqueue才能被执行消费 rollback: 将消费在msgTreeMapTemp中的消息,放回msgTreeMap重新消费 commit: 将临时表msgTreeMapTemp数据清空,代表消费完成,放回最大偏移值 (3) 这里是个TreeMap,对key即消息的offset进行排序,这个样可以使得消息进行顺序消费
consumer 3.长轮询
Rocketmq的消息是由consumer端主动到broker拉取的, consumer向broker发送拉消息请求, PullMessageService服务通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息
DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法执行向broker拉消息动作 1. 获取ProcessQueue判读是否drop的, drop为true返回 2. 给ProcessQueue设置拉消息时间戳 3. 流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息 4. 流量控制,正在消费队列中消息的跨度超过阀值(默认2000),稍后在消费 5. 根据topic获取订阅关系 6. 构建拉消息回调对象PullBack, 从broker拉取消息(异步拉取)返回结果是回调 7. 从内存中获取commitOffsetValue //TODO 这个值跟pullRequest.getNextOffset区别 8. 构建sysFlag pull接口用到的flag 9. 调底层通信层向broker发送拉消息请求 如果master压力过大,会建议去slave拉取消息 如果是到broker拉取消息清楚实时提交标记位,因为slave不允许实时提交消费进度,可以定时提交 //TODO 关于master拉消息实时提交指的是什么? 10. 拉到消息后回调PullCallback 处理broker返回结果pullResult 更新从哪个broker(master 还是slave)拉取消息 反序列化消息 消息过滤 消息中放入队列最大最小offset,方便应用来感知消息堆积度 将消息加入正在处理队列ProcessQueue 将消息提交到消费消息服务ConsumeMessageService 流控处理, 如果pullInterval参数大于0 (拉消息间隔,如果为了降低拉取速度,可以设置大于0的值),延迟再执行拉消息, 如果pullInterval为0立刻在执行拉消息动作
序列图
1. 向broker发送长轮询请求
2. Broker接收长轮询请求
3. Consumer接收broker响应
长轮询活动图:
一张图画不下,再来一张
consumer 4.长轮询push消息-并发消息
通过长轮询拉取到消息后会提交到消息服务ConsumeMessageConcurrentlyService,
ConsumeMessageConcurrentlyServic的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。
长轮询向broker拉取消息是批量拉取的, 默认设置批量的值为pullBatchSize= 32,可配置
消费端consumer构建一个消费消息任务ConsumeRequest消费一批消息的个数是可配置的consumeMessageBatchMaxSize = 1, 默认批量个数为一个
ConsumeRequest 任务run方法执行
判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息 构建并行消费上下文 给消息设置消费失败时候的retrytopic,当消息发送失败的时候发送到topic为%RETRY%groupname的队列中
调MessageListenerConcurrently监听器的consumeMessage方法消费消息,返回消费结果 如果ProcessQueue的droped为true,不处理结果,不更新offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险 处理消费结果 消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,但是消费消息的时候一旦遇到消费消息失败直接放回,根据ackIndex来标记成功消费到哪里了 消费失败, ackIndex设置为-1 广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理 集群模式, 将消费失败的消息一条条的发送到broker的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在cosumer的本地线程定时5秒钟以后重试重新消费消息,在走一次上面的消费流程。 删除正在消费的队列processQueue中本次消费的消息,放回消费进度 更新消费进度,这里只是一个内存offsettable的更新,后面有定时任务更新到broker上去
consumer 5.长轮询push消息-顺序消费消息
顺序消费服务ConsumeMessageConcurrentlyService构建的时候 构建一个线程池来接收消费请求ConsumeRequest 构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务 周期性锁队列lockMQPeriodically 获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map<brokername,Set<MessageQueue>> 遍历Map<brokername,Set<MessageQueue>>的brokername, 获取broker的master机器地址,将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。
在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合,
根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false 通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService, ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。 ConsumeRequest任务的run方法 判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息 每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费, 判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息 计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死 获取客户端的消费批次个数,默认一批次为一条 从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费 调回调接口消费消息,返回状态对象ConsumeOrderlyStatus 根据消费状态,处理结果 1) 非事物方式,自动提交 消息消息状态为success:调用processQueue.commit方法 获取msgTreeMapTemp的最后一个key,表示提交的 offset 清空msgTreeMapTemp的消息,已经成功消费 2) 事物提交,由用户来控制提交回滚(精卫专用) 更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去
consumer 6.消息消费
消费者主动拉取消息消费,客户端通过类DefaultMQPullConsumer 客户端可以指定特定MessageQueue 也可以通过DefaultMQPullConsumer.fetchMessageQueuesInBalance(topic) 获取消费的队列 业务自己获取消费队列,自己到broker拉取消息,以及自己更新消费进度 因为内部实现跟push方式类似就不在啰嗦,用法也请求看示例代码去
consumer 7.shutdown
DefaultMQPushConsumerImpl 关闭消费端 关闭消费线程 将分配到的Set<MessageQueue>的消费进度保存到broker 利用DefaultMQPushConsumerImpl获取ProcessQueueTable<MessageQueue,ProcessQueue>的keyset的messagequeue去获取 RemoteBrokerOffsetStore.offsetTable<MessageQueue,AutomicLong>Map中的消费进度, offsetTable中的messagequeue的值,在update的时候如果没有对应的Messagequeue会构建, 但是也会rebalance的时候将没有分配到的messagequeue删除 rebalance会将offsettable中没有分配到messagequeue删除, 但是在从offsettable删除之前会将offset保存到broker Unregiser客户端 pullMessageService关闭 scheduledExecutorService关闭,关闭一些客户端的起的定时任务 mqClientApi关闭 rebalanceService关闭