RocketMQ源码分析(十三)之ConsumeQueue

时间:2025-03-07 16:24:01
public void putMessagePositionInfoWrapper(DispatchRequest request) { final int maxRetries = 30; //判断ConsumeQueue是否可写 boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable(); for (int i = 0; i < maxRetries && canWrite; i++) { long tagsCode = request.getTagsCode(); //是否开启写ConsumeQueue扩展文件,默认false //bloom过滤器先记录消息的bitMap,这样consumer来读取消息时先通过bloom过滤器判断是否有符合过滤条件的消息 if (isExtWriteEnable()) { ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); cqExtUnit.setFilterBitMap(request.getBitMap()); cqExtUnit.setMsgStoreTime(request.getStoreTimestamp()); cqExtUnit.setTagsCode(request.getTagsCode()); long extAddr = this.consumeQueueExt.put(cqExtUnit); if (isExtAddr(extAddr)) { tagsCode = extAddr; } else { log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit, topic, queueId, request.getCommitLogOffset()); } } //写入缓冲区 boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset()); //如果更新成功,则更新checkpoint文件 if (result) { this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); return; } else { // XXX: warn and notify me log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset() + " failed, retry " + i + " times"); try { Thread.sleep(1000); } catch (InterruptedException e) { log.warn("", e); } } } // XXX: warn and notify me log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); } private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { //maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量 // 如果消息已经被处理,则直接返回true if (offset <= this.maxPhysicOffset) { return true; } this.byteBufferIndex.flip(); //20byte this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset);//写入消息偏移量 this.byteBufferIndex.putInt(size);//写入消息长度 this.byteBufferIndex.putLong(tagsCode);//写入tag hashcode //cqOffset为ConsumerQueue中记录了偏移量总数 final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; //获取ConsumeQueue当前对应的MappedFile,ConsumeQueue本身也是通过MappedFileQueue来管理的 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { //对于新建的文件,填充0来预热PageCache if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { this.minLogicOffset = expectLogicOffset; this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); } if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); //说明已经处理过 if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; } if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } //更新物理偏移量,追加到MappedFile。如果appendMessage追加失败了,等下次继续追加,所以这里可以直接给maxPhysicOffset赋值,不用关心是否追加成功 this.maxPhysicOffset = offset; return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; } private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) { ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); byteBuffer.putLong(0L); byteBuffer.putInt(Integer.MAX_VALUE); byteBuffer.putLong(0L); int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { mappedFile.appendMessage(byteBuffer.array()); } }