@Test public void testSelectMappedBuffer() throws IOException { // 1. 使用 mmap 映射磁盘上的文件 MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64); // 2. 向 file channel 中写入字节 boolean result = mappedFile.appendMessage(storeMessage.getBytes()); assertThat(result).isTrue(); // 3. 读取文件对应的 MappedByteBuffer 中起始位置为 0 的所有内容,即上面写入的内容 // 对 MappedFile 的 refCount 加一 SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0); byte[] data = new byte[storeMessage.length()]; selectMappedBufferResult.getByteBuffer().get(data); String readString = new String(data); assertThat(readString).isEqualTo(storeMessage); // 4. 关闭 mmap,回收 mmap 产生的堆外内存 mappedFile.shutdown(1000); assertThat(mappedFile.isAvailable()).isFalse(); // 5. 对 MappedFile 的 refCount 减一 selectMappedBufferResult.release(); assertThat(mappedFile.isCleanupOver()).isTrue(); // 6. 再次关闭 mmap,同时删除磁盘上的文件 assertThat(mappedFile.destroy(1000)).isTrue(); }
1. mmap 映射文件
private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; // 创建文件 this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; ensureDirOK(this.file.getParent()); try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // mmap 建立映射 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " this.fileName, e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }
2. 写入数据,这里的例子没有使用 mmap,而是直接写到 file channel
public boolean appendMessage(final byte[] data) { int currentPos = this.wrotePosition.get(); if ((currentPos data.length) <= this.fileSize) { try { this.fileChannel.position(currentPos); this.fileChannel.write(ByteBuffer.wrap(data)); } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); } this.wrotePosition.addAndGet(data.length); return true; } return false; }
3. mmap 读取文件内容
// 从 MappedByteBuffer 中截取 pos 到最新位置的内容 public SelectMappedBufferResult selectMappedBuffer(int pos) { int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { // 默认计数器为 1 // 计数加一 if (this.hold()) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); int size = readPosition - pos; ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset pos, byteBufferNew, size, this); } } return null; } // 计数加一 public synchronized boolean hold() { if (this.isAvailable()) { if (this.refCount.getAndIncrement() > 0) { return true; } else { this.refCount.getAndDecrement(); } } return false; }
4. 第一次关闭 mmap,refCount 由 2 变 1,不会触发 cleanup
// org.apache.rocketmq.store.ReferenceResource#shutdown public void shutdown(final long intervalForcibly) { if (this.available) { this.available = false; this.firstShutdownTimestamp = System.currentTimeMillis(); this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); } } } // org.apache.rocketmq.store.ReferenceResource#release public void release() { long value = this.refCount.decrementAndGet(); if (value > 0) return; synchronized (this) { this.cleanupOver = this.cleanup(value); } }
5. 第二次关闭,refCount 由 1 变 0,触发 cleanup
5. 第二次关闭,refCount 由 1 变 0,触发 cleanup // selectMappedBufferResult.release(); // org.apache.rocketmq.store.SelectMappedBufferResult#release public synchronized void release() { if (this.mappedFile != null) { this.mappedFile.release(); this.mappedFile = null; } }
6. 再次关闭 mmap 映射的内存,并删除磁盘的文件
public boolean destroy(final long intervalForcibly) { this.shutdown(intervalForcibly); if (this.isCleanupOver()) { try { this.fileChannel.close(); log.info("close file channel " this.fileName " OK"); long beginTime = System.currentTimeMillis(); boolean result = this.file.delete(); log.info("delete file[REF:" this.getRefCount() "] " this.fileName (result ? " OK, " : " Failed, ") "W:" this.getWrotePosition() " M:" this.getFlushedPosition() ", " UtilAll.computeElapsedTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " this.fileName " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" this.getRefCount() "] " this.fileName " Failed. cleanupOver: " this.cleanupOver); } return false; }
mmap 把磁盘文件映射到堆外内存,rocketMQ 删除文件时,需要这部分堆外内存释放掉。这里如何释放堆外内存呢?后面补