Finnnnnnn 2019-11-05
用到了内存集合为块4k。file中写先写64Kbuf再write、一个文件写完调fsync
immem=>table
迭代器。循环。
计算当前,一块一次flush(加入到fd的buffer),一块加一次index block,
这里简单的每个block一次write。append还做了留64kbuf。在write
一个table一次fsync.
BuildTable
1.add节点。 for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); } 2.加一些汇总meta block等 s = builder->Finish(); if (s.ok()) { meta->file_size = builder->FileSize(); assert(meta->file_size > 0); } delete builder; 3.落盘 if (s.ok()) { s = file->Sync(); //封装fsync } if (s.ok()) { s = file->Close(); } 针对1add 数据结构要算出各部分偏移量,维持4k。一块加一个索引 每4k调一次Flush(自己实现) Flush维护64K的buf。满了调用write。否则只是放在buf中 add: if (r->pending_index_entry) { r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } r->data_block.Add(key, value); //r结构中改 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); if (estimated_block_size >= r->options.block_size) { Flush(); } Flush: WriteBlock(&r->data_block, &r->pending_handle); //r->file->Append(block_contents); if (ok()) { r->pending_index_entry = true; r->status = r->file->Flush(); //调动WriteUnbuffered真正write } 这里每64k调用一次write。否则是fd的buf Status Append(const Slice& data) override { size_t write_size = data.size(); const char* write_data = data.data(); // Fit as much as possible into buffer. size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_); std::memcpy(buf_ + pos_, write_data, copy_size); write_data += copy_size; write_size -= copy_size; pos_ += copy_size; if (write_size == 0) { return Status::OK(); } // Can't fit in buffer, so need to do at least one write. Status status = FlushBuffer(); if (!status.ok()) { return status; } // Small writes go to buffer, large writes are written directly. if (write_size < kWritableFileBufferSize) { //64K std::memcpy(buf_, write_data, write_size); pos_ = write_size; return Status::OK(); } return WriteUnbuffered(write_data, write_size); }
log也用的这个。若Block已经小于header,填充0下一个Block。否则写入block剩余和left小的,加入first等标识。一段调一次加入头信息/crc32c校验和和数据,走append和flush。
Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); // Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record Status s; bool begin = true; do { const int leftover = kBlockSize - block_offset_; assert(leftover >= 0); if (leftover < kHeaderSize) { // Switch to a new block if (leftover > 0) { // Fill the trailer (literal below relies on kHeaderSize being 7) static_assert(kHeaderSize == 7, ""); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); } block_offset_ = 0; } // Invariant: we never leave < kHeaderSize bytes in a block. assert(kBlockSize - block_offset_ - kHeaderSize >= 0); const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length = (left < avail) ? left : avail; RecordType type; const bool end = (left == fragment_length); if (begin && end) { type = kFullType; } else if (begin) { type = kFirstType; } else if (end) { type = kLastType; } else { type = kMiddleType; } s = EmitPhysicalRecord(type, ptr, fragment_length); ptr += fragment_length; left -= fragment_length; begin = false; } while (s.ok() && left > 0); return s; } Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t length) { assert(length <= 0xffff); // Must fit in two bytes assert(block_offset_ + kHeaderSize + length <= kBlockSize); // Format the header char buf[kHeaderSize]; buf[4] = static_cast<char>(length & 0xff); buf[5] = static_cast<char>(length >> 8); buf[6] = static_cast<char>(t); // Compute the crc of the record type and the payload. uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length); crc = crc32c::Mask(crc); // Adjust for storage EncodeFixed32(buf, crc); // Write the header and the payload Status s = dest_->Append(Slice(buf, kHeaderSize)); if (s.ok()) { s = dest_->Append(Slice(ptr, length)); if (s.ok()) { s = dest_->Flush(); } } block_offset_ += kHeaderSize + length; return s; }
在get时更新seek的方案,从sst找并且没有找到的,第一个文件的是seek_file,在update_state时将其allow_seek--。当0时,file_to_compact_设为该sst。在调度的时候优先比较size(0层个数,其他层大于限制)再选seek
一个sst被访问了太多次有可能是每次get都会查找这个sst,但是又没有找到,那不如把这个sst合并到下一层,这样下次就不用做无用的查找了
seek值的选取:
// We arrange to automatically compact this file after // a certain number of seeks. Let's assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. // 在这里更新allowed_seeks, 主要用于seek_compaction f->allowed_seeks = (f->file_size / 16384); if (f->allowed_seeks < 100) f->allowed_seeks = 100;
void PosixEnv::Schedule( void (*background_work_function)(void* background_work_arg), void* background_work_arg) { background_work_mutex_.Lock(); // Start the background thread, if we haven't done so already. if (!started_background_thread_) { started_background_thread_ = true; std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this); background_thread.detach(); } // If the queue is empty, the background thread may be waiting for work. if (background_work_queue_.empty()) { background_work_cv_.Signal(); } background_work_queue_.emplace(background_work_function, background_work_arg); background_work_mutex_.Unlock(); } void PosixEnv::BackgroundThreadMain() { while (true) { background_work_mutex_.Lock(); // Wait until there is work to be done. while (background_work_queue_.empty()) { background_work_cv_.Wait(); } assert(!background_work_queue_.empty()); auto background_work_function = background_work_queue_.front().function; void* background_work_arg = background_work_queue_.front().arg; background_work_queue_.pop(); background_work_mutex_.Unlock(); background_work_function(background_work_arg); } } 调用: env_->Schedule(&DBImpl::BGWork, this); void DBImpl::BGWork(void* db) { reinterpret_cast<DBImpl*>(db)->BackgroundCall(); } void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(background_compaction_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. } else { BackgroundCompaction(); } background_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); background_work_finished_signal_.SignalAll(); } 压缩过程: input是选择出的所有文件的it集合。文件NewTwoLevelIterator或0层的table_cache_->NewIterator input->SeekToFirst(); 每个input builder->add 若超出大小就直接生成文件 compact->outfile->Sync() 调用LogAndApply生成新的version。写入manifest中
version
提供了在当前版本搜索键值的Get方法,其次是为上层调用提供了收集当前版本所有文件的迭代器,最后是为合并文件提供了判断键值范围与文件是否有交集的辅助函数
Version_set这个类不只是简单的Version集合,还操作着和版本变化的一些函数,例如将version_edit应用到新的版本,将新版本设为当前版本等等。
version_edit这个类主要是两个版本之间的差量,version_edit序列化之后,会保存在manifest文件中
每当调用LogAndApply的时候,都会将VersionEdit作为一笔记录,追加写入到MANIFEST文件。并且生成新version加入到版本链表。
将version_set内的文件内的文件编号保存进edit;
新建一个Version,然后调用Builder->apply和Builder->SaveTo方法将edit应用到新版本中.
将edit写进manifest文件中,并更新Current文件,指向最新manifest.
将新版本添加到版本链表中,并设置为当前链表.
Manifest:
使用的coparator名、log编号、前一个log编号、下一个文件编号、上一个序列号。这些都是日志、sstable文件使用到的重要信息,这些字段不一定必然存在。
其次是compact点,可能有多个,写入格式为{kCompactPointer, level, internal key}。
其后是删除文件,可能有多个,格式为{kDeletedFile, level, file number}。
最后是新文件,可能有多个,格式为
{kNewFile, level, file number, file size, min key, max key}。
对于版本间变动它是新加的文件集合,对于MANIFEST快照是该版本包含的所有sstable文件集合。
http://luodw.cc/2015/10/31/le...