RocketMQ 消费者Rebalance算法 解析——图解、源码级解析

时间:2022-10-16 08:01:59

???? Java学习:Java从入门到精通总结

???? 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

???? 绝对不一样的职场干货:大厂最佳实践经验指南


???? 最近更新:2022年10月15日

???? 个人简介:通信工程本硕????、Java程序员????。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

???? 点赞 ???? 收藏 ⭐留言 ???? 都是我最大的动力!


平均分配算法

这也是消息消费时候的默认算法,所谓平均,就是同一个Topic主题下的所有队列被同一个消费者组中的所有Consumer平均消费掉。

例如有5个队列和2和Consumer,就会根据下面的步骤进行分配:

  1. 5除以2不能整除,所以队列无法均分
  2. 每个消费者先分到2个队列
  3. 多出来的1个队列按照顺序分配给了第一个Consumer

RocketMQ 消费者Rebalance算法 解析——图解、源码级解析
具体的源码如下:

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }
		// 当前分配到的Consumer的索引
        int index = cidAll.indexOf(currentCID);
        // 余数
        int mod = mqAll.size() % cidAll.size();
		
		// 队列总数小于Consumer总数时,给当前Consumer分配一个队列消费
		// 不能均分且当前编号小于余数时,需要给当前Consumer分配x + 1个队列,否则分配x个队列
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());

        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;

		// 取min的原因是,如果Consumer多,队列少,多出来的Consumer分配不到队列
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG";
    }
}

环形平均分配算法

使用方法:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());

也可以自定义消费策略:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
            @Override
            public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                // 自定义负载策略
                return null;
            }

            @Override
            public String getName() {
                return null;
            }
        });

所谓环形分配算法,就是把消息队列按照环形进行排列,然后同一个组下的所有Consumer按照顺序进行匹配即可,如下图所示:
RocketMQ 消费者Rebalance算法 解析——图解、源码级解析
上图中Topic下共有10个消息队列,假设消费者组里有4个Consumer,分配过程如下:

  1. 对所有的消息队列和Consumer分别排序
  2. 按照顺序让Consumer和消息队列进行匹配

第一轮分配Queue1到Queue4,第二轮分配Queue5到Queue8,第三轮分配Queue9和Queue10。经过3轮分配完毕

具体源码如下所示:

public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        for (int i = index; i < mqAll.size(); i++) {
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG_BY_CIRCLE";
    }
}



一致性哈希算法

首先先介绍一下一致性哈希算法:

hash算法带来的问题:
假设后台有多个服务器,我们就可以做负载均衡将前端来的请求“平均”分配到各个服务器上来处理,如果是按照用户id对服务器个数N取模来计算hash的话,如果有一台服务器宕机,之前所有的求模计算都要重来,开销较大。


一致性哈希的思路就是:用户按照顺时针方向做排列,离哪个节点近,就去访问哪个节点。
RocketMQ 消费者Rebalance算法 解析——图解、源码级解析
用户按照顺时针方向,离哪个节点近,就去访问哪个节点。

此时如果有服务器宕机,直接顺着找下一个服务器节点就可以了。
RocketMQ 消费者Rebalance算法 解析——图解、源码级解析如果要增加节点:
RocketMQ 消费者Rebalance算法 解析——图解、源码级解析


RocketMQ中对于一致性哈希的源码级实现:

public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    private final int virtualNodeCnt;
    private final HashFunction customHashFunction;

    public AllocateMessageQueueConsistentHash() {
        this(10);
    }
	
	// 设计虚拟节点数量
    public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
        this(virtualNodeCnt, null);
    }

    public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
        if (virtualNodeCnt < 0) {
            throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
        }
        this.virtualNodeCnt = virtualNodeCnt;
        this.customHashFunction = customHashFunction;
    }
	
	// 负载均衡算法主要实现
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {

        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }
		
		// 把所有消费者放到一个List里
        Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
        for (String cid : cidAll) {
            cidNodes.add(new ClientNode(cid));
        }
		
		// 创建hash环形结构
        final ConsistentHashRouter<ClientNode> router; //for building hash ring
        if (customHashFunction != null) {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
        } else {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
        }
		
		// 根据一致性hash算法,基于客户端节点,把分配到当前消费者组的MQ添加到集合里并返回
        List<MessageQueue> results = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            ClientNode clientNode = router.routeNode(mq.toString());
            if (clientNode != null && currentCID.equals(clientNode.getKey())) {
                results.add(mq);
            }
        }

        return results;

    }

    @Override
    public String getName() {
        return "CONSISTENT_HASH";
    }

    private static class ClientNode implements Node {
        private final String clientID;

        public ClientNode(String clientID) {
            this.clientID = clientID;
        }

        @Override
        public String getKey() {
            return clientID;
        }
    }

}

上面代码在ConsistentHashRouter中创建了hash环,算法的主要流程是在这个类中实现的,主要是基于TreeMap,感兴趣的小伙伴可以深入研究一下它的源码~



指定机房算法

假设有两个机房,则对应的消费关系如下图:
RocketMQ 消费者Rebalance算法 解析——图解、源码级解析
指定机房分配算法先根据MQ所述的Broker找出有效的机房里的所有MQ,然后再平分给所有的Consumer

public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
    private Set<String> consumeridcs;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 计算出当前消费者ID在消费者集合中的具体位置
        int currentIndex = cidAll.indexOf(currentCID);
        if (currentIndex < 0) {
            return result;
        }
        // 拿出BrokerName下的所有MQ
        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            String[] temp = mq.getBrokerName().split("@");
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                premqAll.add(mq);
            }
        }
        
		// 队列长度除以客户端长度
        int mod = premqAll.size() / cidAll.size();
        // 队列长度mod客户端长度
        int rem = premqAll.size() % cidAll.size();
        
        // 给Consumer分配MQ
        int startIndex = mod * currentIndex;
        int endIndex = startIndex + mod;
        for (int i = startIndex; i < endIndex; i++) {
            result.add(premqAll.get(i));
        }
        if (rem > currentIndex) {
            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "MACHINE_ROOM";
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}



就进机房算法

顾名思义,就近机房分配策略是一种基于Consumer和机房距离来分配的策略。部署在同一个机房的MQ会被先分配给同一个机房里的Consumer

具体步骤是先统计ConsumerBroker所在的机房,之后再将Broker中的MQ分配给同机房的Consumer消费,如果本机房里没有Consumer,则再尝试分配给其他机房的Consumer

RocketMQ 消费者Rebalance算法 解析——图解、源码级解析

public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
    private final MachineRoomResolver machineRoomResolver;

    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MachineRoomResolver machineRoomResolver) throws NullPointerException {
        if (allocateMessageQueueStrategy == null) {
            throw new NullPointerException("allocateMessageQueueStrategy is null");
        }

        if (machineRoomResolver == null) {
            throw new NullPointerException("machineRoomResolver is null");
        }

        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.machineRoomResolver = machineRoomResolver;
    }

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        // 将MQ按照不同的机房归纳
        Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
        for (MessageQueue mq : mqAll) {
            String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
                if (mr2Mq.get(brokerMachineRoom) == null) {
                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
                }
                mr2Mq.get(brokerMachineRoom).add(mq);
            } else {
                throw new IllegalArgumentException("Machine room is null for mq " + mq);
            }
        }

        // 将consumer按照不同的机房归纳
        Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
        for (String cid : cidAll) {
            String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
                if (mr2c.get(consumerMachineRoom) == null) {
                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
                }
                mr2c.get(consumerMachineRoom).add(cid);
            } else {
                throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
            }
        }

        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

        // 1. 分配与当前消费者部署在同一机房的MQ
        String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
        List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
        }

        //2.如果机房没有活着的消费者,则将其MQ分配给每个其他的机房
        for (String machineRoom : mr2Mq.keySet()) {
            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
                allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
            }
        }

        return allocateResults;
    }

    @Override
    public String getName() {
        return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
    }

    /**
     * 一个解析器对象,用于确定消息队列或客户端部署在哪个机房。
     *
     * AllocateMachineRoomNearby将使用该结果按机房对消息队列和客户端进行分组。
     *
     * 返回值不能为null
     */
    public interface MachineRoomResolver {
        String brokerDeployIn(MessageQueue messageQueue);

        String consumerDeployIn(String clientID);
    }
}



手动配置负载均衡参数

除了使用内置的负载均衡算法以外,还可以手动配置相关的参数,例如设置消费的队列、消费的Topic、消费的机器等,在消费端直接设置消费队列即可:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByConfig(){
            {
                this.setMessageQueueList(Collections.<MessageQueue>singletonList(new MessageQueue(){{
                    this.setQueueId(0);
                    this.setTopic("Topic name");
                    this.setBrokerName("Broker name");
                }}));
            }
        });

上面的代码里,手动指定了消费队列的索引,Topic和Broker服务器的名称,之后Consumer就会在指定的服务器中进行消费,源码如下:

public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
    private List<MessageQueue> messageQueueList;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        return this.messageQueueList;
    }

    @Override
    public String getName() {
        return "CONFIG";
    }

    public List<MessageQueue> getMessageQueueList() {
        return messageQueueList;
    }

    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
        this.messageQueueList = messageQueueList;
    }
}

可以看到源码里只提供了一个消息队列集合,就是我们上面传入的自定义配置的MQ列表,配置完成之后就可以进行负载均衡及消费。