RocketMQ源码分析(十三)之ConsumeQueue
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
//判断ConsumeQueue是否可写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
//是否开启写ConsumeQueue扩展文件,默认false
//bloom过滤器先记录消息的bitMap,这样consumer来读取消息时先通过bloom过滤器判断是否有符合过滤条件的消息
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
//写入缓冲区
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
//如果更新成功,则更新checkpoint文件
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
//maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量
// 如果消息已经被处理,则直接返回true
if (offset <= this.maxPhysicOffset) {
return true;
}
this.byteBufferIndex.flip();
//20byte
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);//写入消息偏移量
this.byteBufferIndex.putInt(size);//写入消息长度
this.byteBufferIndex.putLong(tagsCode);//写入tag hashcode
//cqOffset为ConsumerQueue中记录了偏移量总数
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取ConsumeQueue当前对应的MappedFile,ConsumeQueue本身也是通过MappedFileQueue来管理的
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//对于新建的文件,填充0来预热PageCache
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
//说明已经处理过
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
//更新物理偏移量,追加到MappedFile。如果appendMessage追加失败了,等下次继续追加,所以这里可以直接给maxPhysicOffset赋值,不用关心是否追加成功
this.maxPhysicOffset = offset;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
byteBuffer.putLong(0L);
byteBuffer.putInt(Integer.MAX_VALUE);
byteBuffer.putLong(0L);
int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
}
}