Version
VersionSet类
VersionSet管理整个LevelDB的当前状态:
class VersionSet
{
public:
// ...
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit *edit, port::Mutex *mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);
// Recover the last saved descriptor from persistent storage.
Status Recover(bool *save_manifest);
// ...
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction *PickCompaction();
// ...
private:
// ...
// Save current contents to *log
Status WriteSnapshot(log::Writer *log);
// ...
Env *const env_;
const std::string dbname_;
const Options *const options_;
TableCache *const table_cache_;
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
uint64_t last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily
WritableFile *descriptor_file_;
log::Writer *descriptor_log_;
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version *current_; // == dummy_versions_.prev_
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
// ...
};
其中env_指向LevelDB封装的和系统环境有关的对象,dbname_存储LevelDB的数据路径,options_指向LevelDB构建时的选项,table_cache_指向tablecache,icmp_为key之间的comparator,next_file_number_为下一个文件的FileNumber,manifest_file_number_为manifest文件的FileNumber,last_sequence_用于存储最后一个使用的SequenceNumber,log_number_用于存储log的FileNumber,perv_log_number_用于存储辅助log的FileNumber,descriptor_file_指向manifest文件,descriptor_log_用于向manifest文件中写入日志记录,dummy_versions_为Version双向链表的虚拟头节点,current_指向LevelDB当前最新的Version,compact_pointer_为每一个level中下一次compaction的起始位置的指针数组。
首先来看LogAndApply函数:
Status VersionSet::LogAndApply(VersionEdit *edit, port::Mutex *mu)
LogAndApply函数首先根据当前VersionSet中维护的数据库状态设置VersionEdit对象:
if (edit->has_log_number_)
{
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
}
else
{
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_)
{
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
然后使用Builder将VersionEdit对象应用并构建出新的Version,其中Finalize函数主要是计算compaction score,以确定compaction_level_和compaction_score_的值:
Version *v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
如果当前descriptor_log_和descriptor_file_不存在,则创建它们:
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == nullptr)
{
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok())
{
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
接下来将VersionEdit编码为字符串,再调用descriptor_log_->AddRecord函数的写入manifest文件中,并调用descriptor_file_->Sync函数强制写入磁盘,如果这是一个新的manifest文件,调用SetCurrentFile函数将CURRENT文件指向这个manifest文件:
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// Write new record to MANIFEST log
if (s.ok())
{
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok())
{
s = descriptor_file_->Sync();
}
if (!s.ok())
{
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty())
{
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
最后调用AppendVersion函数将新的Version对象添加进VersionSet的Version链表中,并设置log_number_和prev_log_number_:
// Install the new version
if (s.ok())
{
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
}
else
{
delete v;
if (!new_manifest_file.empty())
{
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->DeleteFile(new_manifest_file);
}
}
return s;
接下来看Recover函数:
Status VersionSet::Recover(bool *save_manifest)
Recover函数首先从CURRENT文件中读取当前manifest文件名:
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok())
{
return s;
}
if (current.empty() || current[current.size() - 1] != '\n')
{
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
由manifest文件名打开manifest文件:
std::string dscname = dbname_ + "/" + current;
SequentialFile *file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok())
{
if (s.IsNotFound())
{
return Status::Corruption(
"CURRENT points to a non-existent file", s.ToString());
}
return s;
}
初始化相关变量用于保存临时的值,初始化一个Builder用于将从manifest文件中解码得到的VersionEdit对象应用到VersionSet上,以此来恢复LevelDB的状态:
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
初始化一个Reader用来读取manifest中的日志记录。循环读取manifest中的记录,将每一条记录解码后生成一个VersionEdit对象,然后应用到VersionSet上,同时用之前定义的一些变量记录VersionEdit中的值:
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok())
{
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok())
{
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name())
{
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok())
{
builder.Apply(&edit);
}
if (edit.has_log_number_)
{
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_)
{
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_)
{
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_)
{
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = nullptr;
将manifest文件中记录的VersionEdit全部应用后,根据当前VersionSet初始化一个Version对象并添加进VersionSet的Version双向链表中,用来表示LevelDB恢复后的版本状态,然后用ReuseManifest函数判断当前的manifest文件是否可以重用,如果可以重用,就没有必要保存Recover函数调用之前新建的manifest文件了:
if (s.ok())
{
Version *v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current))
{
// No need to save new manifest
}
else
{
*save_manifest = true;
}
}
return s;
PickCompaction函数在https://www.cnblogs.com/YuNanlong/p/9440548.html中有介绍。
最后看WriteSnapshot函数,这个函数将当前VersionSet中维护的LevelDB的状态用一个VersionEdit对象保存下来,相当于当前LevelDB所处的状态是由一个新的LevelDB应用了WriteSnapshot函数中创建的VersionEdit对象后得到的,然后函数将这个VersionEdit对象编码后存入manifest文件,以此来作为LevelDB初始时的完整状态信息。将初始状态信息也用一个VersionEdit对象保存使得以后用manifest文件进行恢复时可以将初始状态和之后的compaction带来的改变以同样的方式处理:
Status VersionSet::WriteSnapshot(log::Writer *log)
{
// TODO: Break up into multiple records to reduce memory usage on recovery?
// Save metadata
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());
// Save compaction pointers
for (int level = 0; level < config::kNumLevels; level++)
{
if (!compact_pointer_[level].empty())
{
InternalKey key;
key.DecodeFrom(compact_pointer_[level]);
edit.SetCompactPointer(level, key);
}
}
// Save files
for (int level = 0; level < config::kNumLevels; level++)
{
const std::vector<FileMetaData *> &files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++)
{
const FileMetaData *f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
}
}
std::string record;
edit.EncodeTo(&record);
return log->AddRecord(record);
}
Version类
version实际上管理着LevelDB各个level的文件信息。
class Version
{
public:
// ...
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
struct GetStats
{
FileMetaData *seek_file;
int seek_file_level;
};
Status Get(const ReadOptions &, const LookupKey &key, std::string *val,
GetStats *stats);
// ...
// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
void Unref();
// ...
// Return the level at which we should place a new memtable compaction
// result that covers the range [smallest_user_key,largest_user_key].
int PickLevelForMemTableOutput(const Slice &smallest_user_key,
const Slice &largest_user_key);
// ...
private:
// ...
VersionSet *vset_; // VersionSet to which this Version belongs
Version *next_; // Next version in linked list
Version *prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
// List of files per level
std::vector<FileMetaData *> files_[config::kNumLevels];
// Next file to compact based on seek stats.
FileMetaData *file_to_compact_;
int file_to_compact_level_;
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
double compaction_score_;
int compaction_level_;
// ...
};
其中vset_指向包含这个version的versionset,next_和prev_为version链表的指针,version链表为一个双向链表,由versionset管理,ref_为引用计数,由Ref和Unref函数控制,files_为一个二维数组,每一行表示一个level,一行中包含了对应level的所有文件的元信息,file_to_compact_为需要compact的文件,file_to_compact_level_为需要compact的文件的level,compaction_score_为衡量一个level是否应该compact的值,compaction_level_为应该被compact的level。
Ref和Unref函数用于引用计数,保证一个Version的对象只要还在被引用就不会被析构。
Get函数在https://www.cnblogs.com/YuNanlong/p/9445319.html中有介绍。
PickLevelForMemTableOutput函数在https://www.cnblogs.com/YuNanlong/p/9440548.html中有介绍。
VersionEdit类
VersionEdit主要封装对Version进行一次改变的操作。
class VersionEdit
{
public:
// ...
void EncodeTo(std::string *dst) const;
Status DecodeFrom(const Slice &src);
std::string DebugString() const;
private:
friend class VersionSet;
typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;
std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;
uint64_t next_file_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;
std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
};
其中,has_comparator_、has_log_number_、has_prev_log_number_、has_next_file_number_、has_last_sequence_这几个变量主要是用来标记对应的变量是否存在用的。comparator_用于存储比较key值的comparator的名字,log_number_用于存储log的FileNumber,perv_log_number_用于存储辅助log的FileNumber,next_file_number_用于存储下一个使用的FileNumber,last_sequence_用于存储最后一个使用的SequenceNumber。compact_pointers_用于存储VersionSet中需要被更新的compact_pointer_,deleted_files_用于存储需要被删除的文件,new_files_用于存储新增的文件。
而VersionEdit类中大部分成员函数都是用于设置成员变量,EncodeTo函数用于将VersionEdit编码为字符串,DecodeFrom函数用于从字符串解码出VersionEdit类的对象。
241 Love u