健值存储系统的基本要求
一个优秀的健值存储系统,要具备优秀的读/写,必然得具备以下特点:
- 简洁完备的调用接口
API 至少包括以下一个:Get
Put
Delete
,另外接口函数的参数选项也需要在使用过程中保持一致性,从而实现接口的易用性 - 高效的数据存储结构
用于组织数据的数据结构和算法,从而实现高效的数据存储与获取。通常存储系统采用hash表或B+树的形式进行数据的存储,而LevelDB 是基于LSM树进行存储 - 高效的内存管理机制
存储系统中管理内存的算法与技术,一个优秀的内存管理模,对于发挥键值存储系统的性能具有重要的作用。 - 具备事务操作或批量操作的功能
支持事务操作或者批量操作,允许提交一系列的数据操作请求,并一次性的将数据操作请求进行顺序的执行,并且支持在出现错误时进行数据的回滚
LevelDB整体架构
LevelDB整体架构.pngleveldb是一种基于operation log的文件系统,是Log-Structured-Merge Tree的典型实现,LSM源自Ousterhout和Rosenblum在1991年发表的经典论文 The Design and Implementation of a Log-Structured File System。
一般而言,在常规的物理存储介质上,顺序写比随机写速度要快,而LSM树正是充分利用了这一物理特性,从而实现对频繁,大量数据写操作支持。
image.png当需要插入一条数据记录时,首先在Log文件末尾顺序添加一条数据插入的记录,然后将这个新的数据记录添加到驻留在内存的C0树中,当C0树的数据大小达到某个阈值时,会自动将数据迁移,保留在磁盘上中的C1树这就是LSM树的数据写的过程。
而对于数据读过程,LSM将首先从C0树中进行索引查询,如果不存在,则转向C1中继续查询,直到找到最终的数据记录。
对于LevelDB 来说,C0树采用了MemTable数据结构实现,而C1树采用了SSTable。
数据库的 Open 流程
Open.pngStatus DB::Open(const Options &options, const std::string &dbname, DB **dbptr)
{
*dbptr = nullptr;
DBImpl *impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == nullptr)
{
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile *lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile);
if (s.ok())
{
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest)
{
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok())
{
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok())
{
assert(impl->mem_ != nullptr);
*dbptr = impl;
}
else
{
delete impl;
}
return s;
}
数据 Get 流程
Get.pngGet 函数在查询读取数据时,依次从以及当前保存的SSTable文件中进行查找。
同时Get 函数接口主要是以 key 为标识从获取对应的数据值,然而在内部获取机制中还有一个时间序列尺度的标识,用于究竟获取该key哪一个时间序列的数值,而这个时间序列就是 SequenceNumber
,SequenceNumber
是一个64位的整数,表示序列号用7个字节,最后一个字节用于存储该数据的值类型。对数据库进行写操作会递增该序列号。
查询过程中,皆通过查询对象 LookupKey 进行索引,而 LookupKey 在实例化时主要有两个关键参数:一个为原始健Key,另一个位序列号作为 snapshot。
Status DBImpl::Get(const ReadOptions &options, const Slice &key, std::string *value)
{
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
// 获取序列号并且赋值给 snapshot
if (options.snapshot != nullptr)
{
snapshot = static_cast<const SnapshotImpl *>(options.snapshot)->sequence_number();
}
else
{
snapshot = versions_->LastSequence();
}
MemTable *mem = mem_;
MemTable *imm = imm_;
Version *current = versions_->current();
mem->Ref();
if (imm != nullptr)
imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
{
mutex_.Unlock();
// 先查找MemTable,如果未找到则继续查找 Immutable MemTable,如果仍未找到,则继续查找SSTable
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s))
{
// Done
}
else if (imm != nullptr && imm->Get(lkey, value, &s))
{
// Done
}
else
{
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats))
{
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr)
imm->Unref();
current->Unref();
return s;
}
数据 Put 与 Write 流程
数据 Put 包括3种类型:数据插入,修改,删除。这3种操作均对应两种操作模式:
1、针对单条数据的操作,主要使用 put 接口
2、针对多种操作请求进行批量操作,主要使用 write 接口
但 put
函数也是将单条数据的操作变更为一个批量操作,然后调用 write
函数去实现。
Status DB::Put(const WriteOptions &opt, const Slice &key, const Slice &value)
{
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
另一个队列writers_
,该队列对象中的元素节点为writers_
对象指针,可见 writers_
与写操作的缓存空间有关,批量操作请求均存储在这个队列中,按顺序执行。已完成的出队,未执行的则在这个队列中处于等待状态。
struct DBImpl::Writer
{
explicit Writer(port::Mutex *mu) : batch(nullptr), sync(false), done(false), cv(mu) {}
Status status;
WriteBatch *batch;
bool sync;
bool done;
port::CondVar cv;
};
class DBImpl {
....
std::deque<Writer*> writers_;
};
image.png
整个流程如下:
image.png
struct WriteOptions
{
WriteOptions() = default;
// If true, the write will be flushed from the operating system
// buffer cache (by calling WritableFile::Sync()) before the write
// is considered complete. If this flag is true, writes will be
// slower.
//
// If this flag is false, and the machine crashes, some recent
// writes may be lost. Note that if it is just the process that
// crashes (i.e., the machine does not reboot), no writes will be
// lost even if sync==false.
//
// In other words, a DB write with sync==false has similar
// crash semantics as the "write()" system call. A DB write
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
bool sync = false;
};
Status DBImpl::Write(const WriteOptions &options, WriteBatch *updates)
{
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front())
{
w.cv.Wait();
}
if (w.done)
{
return w.status;
}
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer *last_writer = &w;
if (status.ok() && updates != nullptr)
{
// 合并写入操作
WriteBatch *write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// 将更新记录到WALLog文件,并刷新到磁盘
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync)
{
status = logfile_->Sync();
if (!status.ok())
{
sync_error = true;
}
}
// 将更新写入到MemTable 中
if (status.ok())
{
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error)
{
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_)
tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
// 由于合并写入操作一次可能会处理多个writers_队列中的元素,因此这里将已经处理的元素状态进行变更,并发送信号
while (true)
{
Writer *ready = writers_.front();
writers_.pop_front();
if (ready != &w)
{
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer)
break;
}
// Notify new head of write queue
if (!writers_.empty())
{
writers_.front()->cv.Signal();
}
return status;
}
快照生成与读取分析
快照保存了DB在某一个特定时刻的状态,快照一般为分为不可变的,因而它是一种线程安全且不需要同步机制的访问对象,
LevelDB 中的快照并不是将所有的数据进行完整的物理空间备,而是保存每一个快照备份记录创建时刻的序列号。在DBImpl 类中专门有一个成员变量采用双向链表的数据结构,存储所有快照备份的序列号。
前面已经讲过,用户是通过调用DB->GetSnapshot() 函数创建一个快照,而这个函数本质上就是调用了SnapshotList 的 New 方法,在链表中创建并插入一个新的快照节点,而新节点保存的就是当前DB的最新序列号。而对应的方法,则是调用对应的方法将该快照节点从链表中删除。
image.png
网友评论