snowphy 2020-02-19
@Test
public void testSelectMappedBuffer() throws IOException {
// 1. 使用 mmap 映射磁盘上的文件
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
// 2. 向 file channel 中写入字节
boolean result = mappedFile.appendMessage(storeMessage.getBytes());
assertThat(result).isTrue();
// 3. 读取文件对应的 MappedByteBuffer 中起始位置为 0 的所有内容,即上面写入的内容
// 对 MappedFile 的 refCount 加一
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
byte[] data = new byte[storeMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
assertThat(readString).isEqualTo(storeMessage);
// 4. 关闭 mmap,回收 mmap 产生的堆外内存
mappedFile.shutdown(1000);
assertThat(mappedFile.isAvailable()).isFalse();
// 5. 对 MappedFile 的 refCount 减一
selectMappedBufferResult.release();
assertThat(mappedFile.isCleanupOver()).isTrue();
// 6. 再次关闭 mmap,同时删除磁盘上的文件
assertThat(mappedFile.destroy(1000)).isTrue();
}1. mmap 映射文件
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
// 创建文件
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// mmap 建立映射
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}2. 写入数据,这里的例子没有使用 mmap,而是直接写到 file channel
public boolean appendMessage(final byte[] data) {
int currentPos = this.wrotePosition.get();
if ((currentPos + data.length) <= this.fileSize) {
try {
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
this.wrotePosition.addAndGet(data.length);
return true;
}
return false;
}3. mmap 读取文件内容
// 从 MappedByteBuffer 中截取 pos 到最新位置的内容
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
// 默认计数器为 1
// 计数加一
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
// 计数加一
public synchronized boolean hold() {
if (this.isAvailable()) {
if (this.refCount.getAndIncrement() > 0) {
return true;
} else {
this.refCount.getAndDecrement();
}
}
return false;
}4. 第一次关闭 mmap,refCount 由 2 变 1,不会触发 cleanup
// org.apache.rocketmq.store.ReferenceResource#shutdown
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
// org.apache.rocketmq.store.ReferenceResource#release
public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}5. 第二次关闭,refCount 由 1 变 0,触发 cleanup
5. 第二次关闭,refCount 由 1 变 0,触发 cleanup
// selectMappedBufferResult.release();
// org.apache.rocketmq.store.SelectMappedBufferResult#release
public synchronized void release() {
if (this.mappedFile != null) {
this.mappedFile.release();
this.mappedFile = null;
}
}6. 再次关闭 mmap 映射的内存,并删除磁盘的文件
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}mmap 把磁盘文件映射到堆外内存,rocketMQ 删除文件时,需要这部分堆外内存释放掉。这里如何释放堆外内存呢?后面补