RocketMQ存储之ConsumeQueue

时间:2025-03-07 16:25:27
public void putMessagePositionInfoWrapper(DispatchRequest request) { final int maxRetries = 30; boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { long tagsCode = request.getTagsCode(); if (isExtWriteEnable()) { // 扩展信息的存储 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); cqExtUnit.setFilterBitMap(request.getBitMap()); cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); cqExtUnit.setTagsCode(request.getTagsCode()); long extAddr = this.consumeQueueExt.put(cqExtUnit); if (isExtAddr(extAddr)) { tagsCode = extAddr; } else { log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, topic, queueId, request.getCommitLogOffset()); } } // 存储数据 boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); if (result) { if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); return; } else { // XXX: warn and notify me log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset() + " failed, retry " + i + " times"); try { Thread.sleep(1000); } catch (InterruptedException e) { log.warn("", e); } } } log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); } /** * * @param offset 在commitLog中的offset * @param size message的大小 * @param tagsCode message的tag的哈希吗 * @param cqOffset 期望写入ConsumeQueue的位置 * @return */ private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { // 创建的第一个mappedFile,需要预写空白数据 if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { this.minLogicOffset = expectLogicOffset; this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); } if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); // 期望待写位置比当前写入指针小,认为重复写入了,直接返回成功 if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; } if (expectLogicOffset != currentLogicOffset) { // 期望写入位置不等于当前写入指针位置, LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } this.maxPhysicOffset = offset + size; return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; }