gzx0 2020-01-31
java作为一门拥有GC机制的语言,长久以来它的使用者都不必手动管理内存,这比起c/c++是一个巨大的进步。但现在netty却反其道而行之,实现了一套不依赖GC而自行管理内存的机制。
那么netty为什么要这么做?众所周知netty是一个网络通信层框架,涉及到大量IO操作,对于此类操作,DirectByteBuffer无疑比起堆内存有更多性能上的优势。而与此同时,DirectByteBuffer也有自身的不足,那就是它的申请和释放成本更高。如果直接交给JVM管理,频繁的GC将使DirectByteBuffer的优势荡然无存。所以,对DirectByteBuffer进行池化管理,多次重用以减少申请、释放的想法就比较自然了。但是,不同于一般于我们常见的对象池、连接池等池化的案例,ByteBuffer有大小一说,且申请多大的内存进行管理也难以确定,如果大了会浪费,小了会导致频繁扩容和内存复制以及碎片化。因此netty需要在解决高效分配内存的同时又解决内存碎片化的问题,显然,它使用的算法就非常具有学习价值了。
由于内存管理有许多概念,有必要在前期做一些准备工作以帮助理解。
ByteBufAllocator的默认实现类是PooledByteBufAllocator,而PooledByteBufAllocate及其实现类正是这套内存管理机制的承载者。
首先看一下它的构造函数,来了解哪些类参与了池化内存分配。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); int pageShifts = validateAndCalculatePageShifts(pageSize); if (nHeapArena > 0) { heapArenas = newArenaArray(nHeapArena); for (int i = 0; i < heapArenas.length; i ++) { PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); heapArenas[i] = arena; } } else { heapArenas = null; } if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; } } else { directArenas = null; } }
从构造函数中可以看到,上述管理思想中的ThreadLocal、PoolArena、PoolChunk、Page、PoolSubpage都一一出现。
而且构造函数虽然长,但大致可以分为4个步骤:
PoolChunk在内存分配中处于一个承上启下的位置,所以我们首先来了解一下它。
在PooledByteBufAllocate构造函数值,初始化了chunkSize和pageShifts这2个变量,且用到了pageSize和maxOrder,它们分别由"io.netty.allocator.pageSize"和"io.netty.allocator.maxOrder"设置,pageSize不得小于2^12(4096)且必须为2的幂次方,默认为2^13(8192);maxOrder不得大于14,默认为11。validateAndCalculateChunkSize/PageShifts这2个方法用于检测用户设定值的正确性并计算chunkSize和pageShifts。其中chunkSize是将pageSize左移maxOrder位,且不得大于2^30,默认会是2^24。pageShifts是pageSize相对于‘1‘左移的位数,由于pageSize为2^13,这里pageShifts为13。
如果不了解Chunk与Page的关系,在这里会困惑很久。前文说到Chunk是Page的集合,那么Chunk是如何组织Page的呢?我们进入PoolChunk的构造函数一窥全貌。
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) { unpooled = false; this.arena = arena; this.memory = memory; this.pageSize = pageSize; this.pageShifts = pageShifts; this.maxOrder = maxOrder; this.chunkSize = chunkSize; this.offset = offset; unusable = (byte) (maxOrder + 1); log2ChunkSize = log2(chunkSize); subpageOverflowMask = ~(pageSize - 1); freeBytes = chunkSize; maxSubpageAllocs = 1 << maxOrder; memoryMap = new byte[maxSubpageAllocs << 1]; depthMap = new byte[memoryMap.length]; int memoryMapIndex = 1; for (int d = 0; d <= maxOrder; ++ d) { int depth = 1 << d; for (int p = 0; p < depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; } } subpages = newSubpageArray(maxSubpageAllocs); cachedNioBuffers = new ArrayDeque<ByteBuffer>(8); }
这也是一个相当长的构造函数,其中unpooled、arena、memory、subpages、cachedNioBuffers这几个成员变量与这一节关系不大,先忽略。pageSize、maxOrder、pageShifts、chunkSize已在上文阐述。
重点放在memoryMap和depthMap这2个名为map实为数组的成员变量里。它们的长度为maxSubpageAllocs <<1,为2^12,4096。之后,通过一个循环,将memoryMap和depthMap各自初始化。2个map在初始化后,都变成了一个从下标1开始,每个元素的下标index和值val满足 2^(val) <= index < 2^(val + 1)的数组。这类型的数组是标准的满二叉树数据结构。
由于从下标1开始,这两颗满二叉树一共4095个节点,其中有2048个叶子节点。如果将视线回到ChunkSize的计算,会发现ChunkSize也是通过将pageSize*2048得到,这说明满二叉树的每一个叶子结点都代表了一个Page,且父节点可分配的容量是2个子节点的容量和。
在初始化完成后,depthMap就不再变化,而memoryMap会随着内存分配而变化。我们来看一下PoolChunk如何分配内存:
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { final long handle; if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize handle = allocateRun(normCapacity); } else { handle = allocateSubpage(normCapacity); } if (handle < 0) { return false; } ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null; initBuf(buf, nioBuffer, handle, reqCapacity); return true; }
这段逻辑可以分为2段
private long allocateRun(int normCapacity) { int d = maxOrder - (log2(normCapacity) - pageShifts); int id = allocateNode(d); if (id < 0) { return id; } freeBytes -= runLength(id); return id; }
由于预先对normCapacity进行了规整化,log2(normCapacity)可以得到的对数是一个2的幂次,在这里也相当于二叉树的深度。计算d是为了知晓在二叉树哪一层的节点进行分配内存。为了帮助理解,不妨假设当前需要分配一个16KB大小的内存。则d = 11 - (14 - 13) = 10,为第10层节点。以这个例子再进入allocateNode(d)中。
private int allocateNode(int d) { int id = 1; int initial = - (1 << d); byte val = value(id); if (val > d) { return -1; } while (val < d || (id & initial) == 0) { id <<= 1; val = value(id); if (val > d) { id ^= 1; val = value(id); } } setValue(id, unusable); updateParentsAlloc(id); return id; }
当d=10时,initial不难算出为-1024。val = value(id)=memoryMap[id],此时memory由于还未分配,map里的值还是对应的深度。到了while循环,2个条件里val < d还比较好理解,它表示当前节点的深度不超过d,也就意味着当前节点有足够的容量分配内存;(id & initial)倒是有点晦涩难懂。继续以例子做参照,由于initial=-1024,将它转化为2进制,也可以当做一个掩码,而id从1开始左移,若id左移过多,id & initial就不再为0了,而id左移的意味着深度增加,所以这个条件表达式也是为了确保当前节点的深度不超过d。乍一看这2个条件是重复的,的确,初次分配时,第二个条件没有发挥作用,它得等到第二次分配才体现出效果。
当d等于10,val=10时,会跳出循环,然后将id下标对应的元素值设置为unusable,unusable在PoolChunk的构造函数中定义,为maxOrder+1,意味着不可达的深度,这里为12。
除了更新分配好的节点外,还需要更新其父节点,具体逻辑在updateParentAlloc(id),通过不断将id右移,同时更新父节点为2个子节点中较小的子节点的值,直到根节点。这里value(id ^ 1)就是求得id节点的兄弟节点的值。显然,这是因为其中一个子节点分配出去后,父节点只有剩下的一个子节点可分配了,自然需要更新它的值。
private void updateParentsAlloc(int id) { while (id > 1) { int parentId = id >>> 1; byte val1 = value(id); byte val2 = value(id ^ 1); byte val = val1 < val2 ? val1 : val2; setValue(parentId, val); id = parentId; } }
回到allocateNode方法,之前说过(id & initial)需要在第二次分配才体现出效果。这是因为在第一次分配后,从根节点到分配节点的父节点深度都减少了1,当第二次分配16KB内存时,在512节点处时,val就已经等于10了,
id & initial = 512 & -1024 = 0,从而进入循环。在循环后,id继续左移,找到1024节点,此时由于1024节点已被分配,memory[1024] = 12 > 10 = d,进入if条件内,通过异或1,找到1024节点的兄弟节点1025进行分配。
allocateNode调用完毕后,回到allocateRun方法,在可用ChunkSize里减去该次分配的内存大小。
由于第二三点涉及到其他一些知识,留在以后分析。