caitian 2015-03-17
hbase get/scan的时候
StoreFileScanner next方法
使用HfileReaderV2的内部静态类ScannerV2(HFileScanner) next方法
使用HFileReaderV2 readBlock方法:
1.封装为blockcachekey
2.从blockcache中获取block
3.从hfile中获取block
4.将block放入blockcache中
HfileReaderV2 readBlock
/** * Read in a file block. * @param dataBlockOffset offset to read. * @param onDiskBlockSize size of the block * @param cacheBlock * @param pread Use positional read instead of seek+read (positional is * better doing random reads whereas seek+read is better scanning). * @param isCompaction is this block being read as part of a compaction * @param expectedBlockType the block type we are expecting to read with this * read operation, or null to read whatever block type is available * and avoid checking (that might reduce caching efficiency of * encoded data blocks) * @return Block wrapped in a ByteBuffer. * @throws IOException */ @Override public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, boolean pread, final boolean isCompaction, BlockType expectedBlockType) throws IOException { if (dataBlockIndexReader == null) { throw new IOException("Block index not loaded"); } if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) { throw new IOException("Requested block is out of range: " + dataBlockOffset + ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset()); } // For any given block from any given file, synchronize reads for said // block. // Without a cache, this synchronizing is needless overhead, but really // the other choice is to duplicate work (which the cache would prevent you // from doing). //@1@@@@@@@@@@@@@@@@@@@@ BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset, dataBlockEncoder.getDataBlockEncoding(), expectedBlockType);//封装为blockcachekey,内存和索引都需要 boolean useLock = false; IdLock.Entry lockEntry = null; TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock"); try { while (true) { if (useLock) { lockEntry = offsetLock.getLockEntry(dataBlockOffset);//获得block锁 } //@2@@@@@@@@@@@@@@@@@@@@ // Check cache for block. If found return. if (cacheConf.isBlockCacheEnabled()) {//表是否使用block cache // Try and get the block from the block cache. If the useLock variable is true then this // is the second time through the loop and it should not be counted as a block cache miss. HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, useLock);//从blockcache类(bucketCache,lrublockcache)获取block if (cachedBlock != null) { validateBlockType(cachedBlock, expectedBlockType); if (cachedBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); // Validate encoding type for data blocks. We include encoding // type in the cache key, and we expect it to match on a cache hit. if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { throw new IOException("Cached block under key " + cacheKey + " " + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " + dataBlockEncoder.getDataBlockEncoding() + ")"); } } return cachedBlock; } // Carry on, please load. } if (!useLock) { // check cache again with lock useLock = true; continue; } if (Trace.isTracing()) { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } //@3@@@@@@@@@@@@@@@@@@@@ // Load block from filesystem.没有在cache中获得,或是掉过cache步骤,用hdfs上的block数据,读取hfile文件必须获取block锁 long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); final long delta = System.nanoTime() - startTimeNs; HFile.offerReadLatency(delta, pread); // Cache the block if necessary //@4@@@@@@@@@@@@@@@@@@@@ if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { //表为blockcache,且client端blockcache也为true,则将当前block放入block cache cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); } if (hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } return hfileBlock; } } finally { traceScope.close(); if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } } }
步骤2详细步骤,以BucketCache类为例,
LRUBucketCache也是装饰的词类(好像是阿里一小孩打的patch,厉害厉害)
BucketCache类获取block过程
1.获取在blockcache中的block锁(todo:需要改为读写锁)
2.从bytebufferArray中获得数据
3.当前bucketEntry(block)命中数加1
/** * Get the buffer of the block with the specified key. * @param key block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * @return buffer of specified cache key, or null if not in cache */ @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { if (!cacheEnabled) return null; RAMQueueEntry re = ramCache.get(key); if (re != null) {//从ramCache中命中block,这个cache还没有写到blockcache中 cacheStats.hit(caching); re.access(accessCount.incrementAndGet()); return re.getData(); } //从blockcache中取数据 BucketEntry bucketEntry = backingMap.get(key); if(bucketEntry!=null) { long start = System.nanoTime(); IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset());//获取在blockcache中的block锁 if (bucketEntry.equals(backingMap.get(key))) { int len = bucketEntry.getLength();//数据长度 ByteBuffer bb = ByteBuffer.allocate(len); int lenRead = ioEngine.read(bb, bucketEntry.offset());//从bytebufferArray中获得数据 if (lenRead != len) { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } Cacheable cachedBlock = bucketEntry.deserializerReference( deserialiserMap).deserialize(bb, true); long timeTaken = System.nanoTime() - start; cacheStats.hit(caching); cacheStats.ioHit(timeTaken); bucketEntry.access(accessCount.incrementAndGet());//当前bucketEntry(block)命中数加1 if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; } return cachedBlock; } } catch (IOException ioex) { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); } finally { if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } } } if(!repeat)cacheStats.miss(caching); return null; }
步骤4中的cacheBlock,也说说BucketCache
这个方法很简单,
内存数据先写到ramcache中,并放入ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues随机一个队列中
之后又线程会处理这些queue
/** * Cache the block to ramCache * @param cacheKey block's cache key * @param cachedItem block buffer * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { if (!cacheEnabled) return; if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) return; /* * Stuff the entry into the RAM cache so it can get drained to the * persistent store */ RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);//封装ramentry ramCache.put(cacheKey, re); int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();//随机取write queue BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); boolean successfulAddition = bq.offer(re); if (!successfulAddition && wait) { synchronized (cacheWaitSignals[queueNum]) { try { cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } successfulAddition = bq.offer(re); } if (!successfulAddition) { ramCache.remove(cacheKey); failedBlockAdditions.incrementAndGet(); } else { this.blockNumber.incrementAndGet(); this.heapSize.addAndGet(cachedItem.heapSize()); blocksByHFile.put(cacheKey.getHfileName(), cacheKey); } }
多个WriterThread线程,将数据从队列中取出ramcache写入到bytebufferarray(阿里据说的高速磁盘上,我眼瞎没看到)
并且将bba数据的索引封装为bucketentry放入 backingMap中,这个才是blockcache;
/** * 1.循环获取ramcache中的数据,生成数据索引,将数据放入bba中 * 2.数据索引放入bucket中 * 3.从ramchache中删除 * Flush the entries in ramCache to IOEngine and add bucket entry to * backingMap * @param entries * @throws InterruptedException */ private void doDrain(List<RAMQueueEntry> entries) throws InterruptedException { //要写入bucket的entry 数据索引 BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; //ramcache中内存中的数据 RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; int done = 0; while (entries.size() > 0 && cacheEnabled) { // Keep going in case we throw... RAMQueueEntry ramEntry = null; try { ramEntry = entries.remove(entries.size() - 1);//获得entry if (ramEntry == null) { LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); continue; } BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);//@@@@返回数据索引,将数据存储到bytebufferArray里面 ramEntries[done] = ramEntry; bucketEntries[done++] = bucketEntry;//内存bba的数据索引 if (ioErrorStartTime > 0) { ioErrorStartTime = -1; } } catch (BucketAllocatorException fle) { LOG.warn("Failed allocating for block " + (ramEntry == null ? "" : ramEntry.getKey()), fle); } catch (CacheFullException cfe) { if (!freeInProgress) { freeSpace(); } else { Thread.sleep(50); } } catch (IOException ioex) { LOG.error("Failed writing to bucket cache", ioex); checkIOErrorIsTolerated(); } } // Make sure that the data pages we have written are on the media before // we update the map. try { ioEngine.sync(); } catch (IOException ioex) { LOG.error("Faild syncing IO engine", ioex); checkIOErrorIsTolerated(); // Since we failed sync, free the blocks in bucket allocator for (int i = 0; i < done; ++i) { if (bucketEntries[i] != null) { bucketAllocator.freeBlock(bucketEntries[i].offset()); } } done = 0; } for (int i = 0; i < done; ++i) { if (bucketEntries[i] != null) {//将数据索引放入bba中 backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); } RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); if (ramCacheEntry != null) { heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize()); } } if (bucketAllocator.getUsedSize() > acceptableSize()) { freeSpace(); } } }
下面是writeToCache方法
/** * 返回数据索引,将数据存储到bytebufferArray里面 * 没看到高速磁盘什么事情 * @param ioEngine * @param bucketAllocator * @param deserialiserMap * @param realCacheSize * @return * @throws CacheFullException * @throws IOException * @throws BucketAllocatorException */ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, final UniqueIndexMap<Integer> deserialiserMap, final AtomicLong realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized... if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);//创建数据的引用,到be中 try {//将数据写到bytebufferArray里面 if (data instanceof HFileBlock) { ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();//获取data bb sliceBuf.rewind(); assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer); ioEngine.write(sliceBuf, offset);//写数据到bytebufferArray里面 ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); } else { ByteBuffer bb = ByteBuffer.allocate(len); data.serialize(bb); ioEngine.write(bb, offset); } } catch (IOException ioe) { // free it in bucket allocator bucketAllocator.freeBlock(offset); throw ioe; } realCacheSize.addAndGet(len); return bucketEntry; } }