从 MappedFile 的单元测试看 mmap

snowphy 2020-02-19

@Test
public void testSelectMappedBuffer() throws IOException {
    // 1. 使用 mmap 映射磁盘上的文件
    MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
    // 2. 向 file channel 中写入字节
    boolean result = mappedFile.appendMessage(storeMessage.getBytes());
    assertThat(result).isTrue();

    // 3. 读取文件对应的 MappedByteBuffer 中起始位置为 0 的所有内容,即上面写入的内容
    // 对 MappedFile 的 refCount 加一
    SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
    byte[] data = new byte[storeMessage.length()];
    selectMappedBufferResult.getByteBuffer().get(data);
    String readString = new String(data);
    assertThat(readString).isEqualTo(storeMessage);

    // 4. 关闭 mmap,回收 mmap 产生的堆外内存
    mappedFile.shutdown(1000);
    assertThat(mappedFile.isAvailable()).isFalse();
    // 5. 对 MappedFile 的 refCount 减一
    selectMappedBufferResult.release();
    assertThat(mappedFile.isCleanupOver()).isTrue();
    // 6. 再次关闭 mmap,同时删除磁盘上的文件
    assertThat(mappedFile.destroy(1000)).isTrue();
}

1. mmap 映射文件

private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    // 创建文件
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        // mmap 建立映射
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

2. 写入数据,这里的例子没有使用 mmap,而是直接写到 file channel

public boolean appendMessage(final byte[] data) {
    int currentPos = this.wrotePosition.get();

    if ((currentPos + data.length) <= this.fileSize) {
        try {
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
        } catch (Throwable e) {
            log.error("Error occurred when append message to mappedFile.", e);
        }
        this.wrotePosition.addAndGet(data.length);
        return true;
    }

    return false;
}

3. mmap 读取文件内容

// 从 MappedByteBuffer 中截取 pos 到最新位置的内容
public SelectMappedBufferResult selectMappedBuffer(int pos) {
    int readPosition = getReadPosition();
    if (pos < readPosition && pos >= 0) {
        // 默认计数器为 1
        // 计数加一
        if (this.hold()) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(pos);
            int size = readPosition - pos;
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }

    return null;
}
// 计数加一
public synchronized boolean hold() {
    if (this.isAvailable()) {
        if (this.refCount.getAndIncrement() > 0) {
            return true;
        } else {
            this.refCount.getAndDecrement();
        }
    }

    return false;
}

4. 第一次关闭 mmap,refCount 由 2 变 1,不会触发 cleanup

// org.apache.rocketmq.store.ReferenceResource#shutdown
public void shutdown(final long intervalForcibly) {
    if (this.available) {
        this.available = false;
        this.firstShutdownTimestamp = System.currentTimeMillis();
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
}

// org.apache.rocketmq.store.ReferenceResource#release
public void release() {
    long value = this.refCount.decrementAndGet();
    if (value > 0)
        return;

    synchronized (this) {

        this.cleanupOver = this.cleanup(value);
    }
}

5. 第二次关闭,refCount 由 1 变 0,触发 cleanup

5. 第二次关闭,refCount 由 1 变 0,触发 cleanup
// selectMappedBufferResult.release();
// org.apache.rocketmq.store.SelectMappedBufferResult#release
public synchronized void release() {
    if (this.mappedFile != null) {
        this.mappedFile.release();
        this.mappedFile = null;
    }
}

6. 再次关闭 mmap 映射的内存,并删除磁盘的文件

public boolean destroy(final long intervalForcibly) {
    this.shutdown(intervalForcibly);

    if (this.isCleanupOver()) {
        try {
            this.fileChannel.close();
            log.info("close file channel " + this.fileName + " OK");

            long beginTime = System.currentTimeMillis();
            boolean result = this.file.delete();
            log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                + this.getFlushedPosition() + ", "
                + UtilAll.computeElapsedTimeMilliseconds(beginTime));
        } catch (Exception e) {
            log.warn("close file channel " + this.fileName + " Failed. ", e);
        }

        return true;
    } else {
        log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
            + " Failed. cleanupOver: " + this.cleanupOver);
    }

    return false;
}

mmap 把磁盘文件映射到堆外内存,rocketMQ 删除文件时,需要这部分堆外内存释放掉。这里如何释放堆外内存呢?后面补

相关推荐