在上1篇里,我们主要从使用层面介绍了Consumer的负载均衡机制。这一篇,我们将深入到源码内部,深入分析负载均衡的过程。
如果把RocketMQ的负载均衡和Kafka的对比一下,我们会发现有一些重要的不同之处。
收集信息 – 存储在Broker还是NameServer上面?
要做负载均衡,首先要解决的一个问题就是收集信息。所谓收集信息,就是我得知道每一个consumer group都有哪些consumer,对应的topic是谁?
这样一份全局的信息,是存放在Broker,还是NameServer上面呢?
Client发送心跳消息
RocketMQ选择了存放在Broker上面。具体做法是:客户端会通过心跳消息,不停的上报自己,RegisterConsumer。
代码如下:
//MQClientInstance的start函数里面,有一个startScheduledTask,里面开了很多的后台任务
private void startScheduledTask() {
..
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); //关键的1句:向所有的broker发送心跳消息
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
...
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
private void sendHeartbeatToAllBroker() { final HeartbeatData heartbeatData = this.prepareHeartbeatData(); //关键的一句 ... this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);} private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { ConsumerData consumerData = new ConsumerData(); consumerData.setGroupName(impl.groupName()); consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel()); consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode()); heartbeatData.getConsumerDataSet().add(consumerData); } } ...}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
Broker接收心跳消息,维护全局的consumer信息
Broker收到该心跳消息,把它维护在一个叫做ConsumerManager的对象里面。
public class BrokerController {
...
private final ConsumerManager consumerManager; //关键
- 1
- 2
- 3
- 1
- 2
- 3
public class ClientManageProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.HEART_BEAT: //关键 return this.heartBeat(ctx, request);} public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { boolean changed = this.brokerController.getConsumerManager().registerConsumer( //注册Consumer data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable );}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
负载均衡是客户端做还是服务器做?
现在Broker有了每个topic的所有consumer group的所有consumers,那负载均衡谁来做呢?
2种方案:方案1,让broker来分配,分配好,发给每个consumer;方案2,consumer从broker那获取这份全局信息,自己做。
同Kafka一样,RocketMQ也是选择的第2种方案,就是客户端自己做!
为什么让客户端做,而不是服务器做,在Kafka的那篇文章中,已经有探讨,此处不再详述。
那么客户端具体如何做的呢?且看代码:
//RebanlanceImpl
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder); //关键
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
//可以看到,此处对于BROADCASTING模式和CLUSTERING模式,分开处理。此处我们只分析CLUSTERING模式 private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", // consumerGroup, // topic, // mqSet, // mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); //关键的代码1:此处就是从上面Broker的ConsumerMananger中,取得Consumer的全局信息 if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate(// 关键的第2句:交给分配器进行分配 this.consumerGroup, // this.mQClientFactory.getClientId(), // 自己的编号,也就是clientId mqAll, //该topic的所有MessageQueue的列表 cidAll); //所有consumer的列表 } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
可以看到,分配过程,就是关键的2步:
第1步: 每个consumer都会向broker获取一份全局信息,也就是该topic的consumer group中所有consumer的列表
第2步:把3个关键参数,自己的clientId, 所有consumer的clientIdList,该topic的MessageQueueList,传给分配器。
分配器计算出该consumer应该分到的MessageQueueList。客户端定义了几种不同的分配策略,如下:
算法都比较简单,此处就不再详述。
clientId是如何生成的?
顺便提一下,这里分配算法要用到clientId,那这个clientId是根据什么规则生成的呢?
如下所示,也就是本机ip @ instanceName。instanceName缺省值为值”DEFAULT”,但在运行时,被设置为pid。unitName缺省为空。
//ClientConfig
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
//运行时,把instanceName设置为pid public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } } public static int getPid() { RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); String name = runtime.getName(); // format: "pid@hostname" try { return Integer.parseInt(name.substring(0, name.indexOf('@'))); } catch (Exception e) { return -1; } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
与Kafka Consumer负载均衡的不同点
分析完RocketMQ的负载均衡,我们发现它比Kafka的负载均衡要简单,Kafka在这1块搞了蛮复杂的通信协议,可以参见上面那篇文章。
具体来说,有2个地方,要简化很多:
(1) Kafka首先为每个Consumer Group选出了一个Coordinator,所有的Consumer要先找到这个Coordinator,然后和其通信,开始负载均衡。
RocketMQ直接省去了这个选Coordinator的过程,直接让consumer和所有broker广播通信。
(2) 虽然Kafka也是让客户端做负载均衡,但是做法和RocketMQ并不一样。Kafka是让Coordinator从所有Consumer中,选出了一个Master Consumer,让它负载分配。它分好之后,把分配结果传给其他的Consumer。
RocketMQ没搞这么复杂,而是所有Consumer都获取到这份全局的consumer列表,每个人自己分自己那一份!!少了上面的通信过程。