【RocketMQ】源码详解:ACK消息、消息重试、消费进度管理

时间:2022-09-20 01:19:19

ACK消息(消息重试)

消费端发送

入口: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack

消息消费失败的情况下,会发送ack消息给broker, 发送的内容包括topic、偏移量、msgId等


消息消费失败,发送ack消息:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack

/**
 * 发送ack消息
 * @param msg
 * @param context
 * @return
 */
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
    int delayLevel = context.getDelayLevelWhenNextConsume();

    // Wrap topic with namespace before sending back message.
    msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
    try {
        // 发送ack
        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
        return true;
    } catch (Exception e) {
        log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
    }

    return false;
}

发送消息请求:org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack

public void consumerSendMessageBack(
    final String addr,
    final MessageExt msg,
    final String consumerGroup,
    final int delayLevel,
    final long timeoutMillis,
    final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
    // 构建请求
    ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

    requestHeader.setGroup(consumerGroup);
    requestHeader.setOriginTopic(msg.getTopic());
    requestHeader.setOffset(msg.getCommitLogOffset());
    requestHeader.setDelayLevel(delayLevel);
    requestHeader.setOriginMsgId(msg.getMsgId());
    requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

    // 向broker发送请求
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

broker端接收

入口:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest;

接收到ack消息后,会重新设置其topic为:%RETRY%+消费组名称,作为重试主题,会随机选择该重试主题下的queue

随后会计算重试次数,如果超过16次,则发到死信队列,死信队列中的消息需要人工介入处理

未超过16次,则计算延时级别,作为延时消息进行保存,延时消息投递至延时topic:SCHEDULE_TOPIC_XXXX。


接收到消息:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest

// 消费者发送的ACK消息
case RequestCode.CONSUMER_SEND_MSG_BACK:
    return this.asyncConsumerSendMsgBack(ctx, request);

处理ack消息:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack

获取重试主题、queueId

// 重试主题: %RETRY%+消费组名称
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 随机选一个queue
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

ack消息请求不包含该消息的详细内容,因此根据请求过来的偏移量,获取到该消息

// 获取消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());

计算重试次数,设置延时级别

// 设置重试次数
// 重试次数大于最大重试次数(默认16),进入死信队列,需要人工处理
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
    || delayLevel < 0) {
    // 死信队列主题: %DLQ% + 消费者组名
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
            DLQ_NUMS_PER_GROUP,
            // 权限为只写,说明rocketmq不负责处理了,需要人工介入
            PermName.PERM_WRITE, 0);
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return CompletableFuture.completedFuture(response);
    }
} else {
    // 计算消息延迟级别
    // 3 + 消费失败的次数
    // 延时级别1-18分别延时: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    if (0 == delayLevel) {
        delayLevel = 3 + msgExt.getReconsumeTimes();
    }
    msgExt.setDelayTimeLevel(delayLevel);
}

存储消息:

// 存储消息
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

存储延时消息特殊处理:org.apache.rocketmq.store.CommitLog#asyncPutMessage

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery
    // 如果是延时级别大于0,则为延时消息
    if (msg.getDelayTimeLevel() > 0) {
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }
        // 设置topic为SCHEDULE_TOPIC_XXXX,表示延时队列的消息
        topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        // 备份topic、queueID,如果是重试消息,则把%RETRY%的topic放入
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

消费进度管理

消费端

入口: org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll

每个消费者的RemoteBrokerOffsetStore中都会维护其所有的queue的消费进度offsetTable, 消费成功后会将消费进度更新至该map中

在消费者启动过程中, 会启动多个定时任务, 其中就包含推送消费进度的, 循环每10s执行一次, 内部会遍历一个客户端下的每一个消费者, 各消费者会将offsetTable推送至broker


消费者启动:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

org.apache.rocketmq.client.impl.factory.MQClientInstance#start

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask

// 更新offset到broker的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

循环推送每个consumer的:

org.apache.rocketmq.client.impl.factory.MQClientInstance#persistAllConsumerOffset

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#persistConsumerOffset

private void persistAllConsumerOffset() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        // 循环每个consumer
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            impl.persistConsumerOffset();
        }
    }
@Override
public void persistConsumerOffset() {
    try {
        this.makeSureStateOK();
        Set<MessageQueue> mqs = new HashSet<MessageQueue>();
        Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
        mqs.addAll(allocateMq);
        // 推送该消费者下的全部queue的offset
        this.offsetStore.persistAll(mqs);
    } catch (Exception e) {
        log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
    }
}

推送给broker:

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker

org.apache.rocketmq.client.impl.MQClientAPIImpl#updateConsumerOffsetOneway

if (isOneway) {
    // 单向推送, 即没有返回
    this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// 远程调用推送给broker
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);

Broker端

入口 : org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset

broker端接收到 消费端的提交请求后, 也是将其保存在一个map

broker启动时, 会将brokerController进行初始化, 过程中会启动一个定时任务, 每隔5s将消费者offset进行持久化,存入consumerOffset.json文件中

/**
 * key   = topic@consumerGroup
 * value = {queueId : offset}
 */
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
    new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

org.apache.rocketmq.broker.BrokerController#initialize

//每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);