ACK消息(消息重试)
消费端发送
入口: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack
消息消费失败的情况下,会发送ack消息给broker, 发送的内容包括topic、偏移量、msgId等
消息消费失败,发送ack消息:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#sendMessageBack
/**
* 发送ack消息
* @param msg
* @param context
* @return
*/
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
// 发送ack
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
发送消息请求:org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
// 构建请求
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
// 向broker发送请求
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
broker端接收
入口:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest;
接收到ack消息后,会重新设置其topic为:%RETRY%+消费组名称,作为重试主题,会随机选择该重试主题下的queue
随后会计算重试次数,如果超过16次,则发到死信队列,死信队列中的消息需要人工介入处理
未超过16次,则计算延时级别,作为延时消息进行保存,延时消息投递至延时topic:SCHEDULE_TOPIC_XXXX。
接收到消息:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest
// 消费者发送的ACK消息
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
处理ack消息:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncConsumerSendMsgBack
获取重试主题、queueId
// 重试主题: %RETRY%+消费组名称
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 随机选一个queue
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
ack消息请求不包含该消息的详细内容,因此根据请求过来的偏移量,获取到该消息
// 获取消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
计算重试次数,设置延时级别
// 设置重试次数
// 重试次数大于最大重试次数(默认16),进入死信队列,需要人工处理
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 死信队列主题: %DLQ% + 消费者组名
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
// 权限为只写,说明rocketmq不负责处理了,需要人工介入
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
// 计算消息延迟级别
// 3 + 消费失败的次数
// 延时级别1-18分别延时: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
存储消息:
// 存储消息
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
存储延时消息特殊处理:org.apache.rocketmq.store.CommitLog#asyncPutMessage
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 如果是延时级别大于0,则为延时消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 设置topic为SCHEDULE_TOPIC_XXXX,表示延时队列的消息
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 备份topic、queueID,如果是重试消息,则把%RETRY%的topic放入
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
消费进度管理
消费端
入口: org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persistAll
每个消费者的RemoteBrokerOffsetStore中都会维护其所有的queue的消费进度offsetTable, 消费成功后会将消费进度更新至该map中
在消费者启动过程中, 会启动多个定时任务, 其中就包含推送消费进度的, 循环每10s执行一次, 内部会遍历一个客户端下的每一个消费者, 各消费者会将offsetTable推送至broker
消费者启动:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.impl.factory.MQClientInstance#start
org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask
// 更新offset到broker的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
循环推送每个consumer的:
org.apache.rocketmq.client.impl.factory.MQClientInstance#persistAllConsumerOffset
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#persistConsumerOffset
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
// 循环每个consumer
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
@Override
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
// 推送该消费者下的全部queue的offset
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
推送给broker:
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
org.apache.rocketmq.client.impl.MQClientAPIImpl#updateConsumerOffsetOneway
if (isOneway) {
// 单向推送, 即没有返回
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// 远程调用推送给broker
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
Broker端
入口 : org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset
broker端接收到 消费端的提交请求后, 也是将其保存在一个map
broker启动时, 会将brokerController进行初始化, 过程中会启动一个定时任务, 每隔5s将消费者offset进行持久化,存入consumerOffset.json文件中
/**
* key = topic@consumerGroup
* value = {queueId : offset}
*/
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
org.apache.rocketmq.broker.BrokerController#initialize
//每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);