美文网首页
LevelDB 整体架构

LevelDB 整体架构

作者: wayyyy | 来源:发表于2021-04-04 16:15 被阅读0次
    健值存储系统的基本要求

    一个优秀的健值存储系统,要具备优秀的读/写,必然得具备以下特点:

    • 简洁完备的调用接口
      API 至少包括以下一个:Get Put Delete,另外接口函数的参数选项也需要在使用过程中保持一致性,从而实现接口的易用性
    • 高效的数据存储结构
      用于组织数据的数据结构和算法,从而实现高效的数据存储与获取。通常存储系统采用hash表或B+树的形式进行数据的存储,而LevelDB 是基于LSM树进行存储
    • 高效的内存管理机制
      存储系统中管理内存的算法与技术,一个优秀的内存管理模,对于发挥键值存储系统的性能具有重要的作用。
    • 具备事务操作或批量操作的功能
      支持事务操作或者批量操作,允许提交一系列的数据操作请求,并一次性的将数据操作请求进行顺序的执行,并且支持在出现错误时进行数据的回滚
    LevelDB整体架构
    LevelDB整体架构.png

    leveldb是一种基于operation log的文件系统,是Log-Structured-Merge Tree的典型实现,LSM源自Ousterhout和Rosenblum在1991年发表的经典论文 The Design and Implementation of a Log-Structured File System。

    一般而言,在常规的物理存储介质上,顺序写比随机写速度要快,而LSM树正是充分利用了这一物理特性,从而实现对频繁,大量数据写操作支持。

    image.png

    当需要插入一条数据记录时,首先在Log文件末尾顺序添加一条数据插入的记录,然后将这个新的数据记录添加到驻留在内存的C0树中,当C0树的数据大小达到某个阈值时,会自动将数据迁移,保留在磁盘上中的C1树这就是LSM树的数据写的过程。

    而对于数据读过程,LSM将首先从C0树中进行索引查询,如果不存在,则转向C1中继续查询,直到找到最终的数据记录。

    对于LevelDB 来说,C0树采用了MemTable数据结构实现,而C1树采用了SSTable。


    数据库的 Open 流程
    Open.png
    Status DB::Open(const Options &options, const std::string &dbname, DB **dbptr)
    {
        *dbptr = nullptr;
    
        DBImpl *impl = new DBImpl(options, dbname);
        impl->mutex_.Lock();
        VersionEdit edit;
       
        bool save_manifest = false;
        Status s = impl->Recover(&edit, &save_manifest);
        if (s.ok() && impl->mem_ == nullptr)
        {
            uint64_t new_log_number = impl->versions_->NewFileNumber();
            WritableFile *lfile;
            s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile);
            if (s.ok())
            {
                edit.SetLogNumber(new_log_number);
                impl->logfile_ = lfile;
                impl->logfile_number_ = new_log_number;
                impl->log_ = new log::Writer(lfile);
                impl->mem_ = new MemTable(impl->internal_comparator_);
                impl->mem_->Ref();
            }
        }
        if (s.ok() && save_manifest)
        {
            edit.SetPrevLogNumber(0); // No older logs needed after recovery.
            edit.SetLogNumber(impl->logfile_number_);
            s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
        }
        if (s.ok())
        {
            impl->RemoveObsoleteFiles();
            impl->MaybeScheduleCompaction();
        }
        impl->mutex_.Unlock();
        if (s.ok())
        {
            assert(impl->mem_ != nullptr);
            *dbptr = impl;
        }
        else
        {
            delete impl;
        }
        return s;
    }
    

    数据 Get 流程
    Get.png

    Get 函数在查询读取数据时,依次从以及当前保存的SSTable文件中进行查找。

    同时Get 函数接口主要是以 key 为标识从获取对应的数据值,然而在内部获取机制中还有一个时间序列尺度的标识,用于究竟获取该key哪一个时间序列的数值,而这个时间序列就是 SequenceNumberSequenceNumber 是一个64位的整数,表示序列号用7个字节,最后一个字节用于存储该数据的值类型。对数据库进行写操作会递增该序列号。

    image.png

    查询过程中,皆通过查询对象 LookupKey 进行索引,而 LookupKey 在实例化时主要有两个关键参数:一个为原始健Key,另一个位序列号作为 snapshot。

    Status DBImpl::Get(const ReadOptions &options, const Slice &key, std::string *value)
    {
        Status s;
        MutexLock l(&mutex_);
        SequenceNumber snapshot;
        // 获取序列号并且赋值给 snapshot
        if (options.snapshot != nullptr)
        {
            snapshot = static_cast<const SnapshotImpl *>(options.snapshot)->sequence_number();
        }
        else
        {
            snapshot = versions_->LastSequence();
        }
    
        MemTable *mem = mem_;
        MemTable *imm = imm_;
        Version *current = versions_->current();
        mem->Ref();
        if (imm != nullptr)
            imm->Ref();
        current->Ref();
    
        bool have_stat_update = false;
        Version::GetStats stats;
    
        {
            mutex_.Unlock();
            // 先查找MemTable,如果未找到则继续查找 Immutable MemTable,如果仍未找到,则继续查找SSTable
            LookupKey lkey(key, snapshot);
            if (mem->Get(lkey, value, &s))
            {
                // Done
            }
            else if (imm != nullptr && imm->Get(lkey, value, &s))
            {
                // Done
            }
            else
            {
                s = current->Get(options, lkey, value, &stats);
                have_stat_update = true;
            }
            mutex_.Lock();
        }
    
        if (have_stat_update && current->UpdateStats(stats))
        {
            MaybeScheduleCompaction();
        }
        mem->Unref();
        if (imm != nullptr)
            imm->Unref();
        current->Unref();
        return s;
    }
    

    数据 Put 与 Write 流程

    数据 Put 包括3种类型:数据插入,修改,删除。这3种操作均对应两种操作模式:
    1、针对单条数据的操作,主要使用 put 接口
    2、针对多种操作请求进行批量操作,主要使用 write 接口

    put 函数也是将单条数据的操作变更为一个批量操作,然后调用 write 函数去实现。

    Status DB::Put(const WriteOptions &opt, const Slice &key, const Slice &value)
    {
        WriteBatch batch;
        batch.Put(key, value);
        return Write(opt, &batch);
    }
    

    另一个队列writers_,该队列对象中的元素节点为writers_对象指针,可见 writers_ 与写操作的缓存空间有关,批量操作请求均存储在这个队列中,按顺序执行。已完成的出队,未执行的则在这个队列中处于等待状态。

    struct DBImpl::Writer
    {
        explicit Writer(port::Mutex *mu) : batch(nullptr), sync(false), done(false), cv(mu) {}
    
        Status status;
        WriteBatch *batch;
        bool sync;
        bool done;
        port::CondVar cv;
    };
    class DBImpl {
        ....
        std::deque<Writer*> writers_;
    };
    
    image.png

    整个流程如下:


    image.png
    struct WriteOptions
    {
        WriteOptions() = default;
    
        // If true, the write will be flushed from the operating system
        // buffer cache (by calling WritableFile::Sync()) before the write
        // is considered complete.  If this flag is true, writes will be
        // slower.
        //
        // If this flag is false, and the machine crashes, some recent
        // writes may be lost.  Note that if it is just the process that
        // crashes (i.e., the machine does not reboot), no writes will be
        // lost even if sync==false.
        //
        // In other words, a DB write with sync==false has similar
        // crash semantics as the "write()" system call.  A DB write
        // with sync==true has similar crash semantics to a "write()"
        // system call followed by "fsync()".
        bool sync = false;
    };
    
    Status DBImpl::Write(const WriteOptions &options, WriteBatch *updates)
    {
        Writer w(&mutex_);
        w.batch = updates;
        w.sync = options.sync;
        w.done = false;
    
        MutexLock l(&mutex_);
        writers_.push_back(&w);
    
        while (!w.done && &w != writers_.front())
        {
            w.cv.Wait();
        }
        if (w.done)
        {
            return w.status;
        }
    
        Status status = MakeRoomForWrite(updates == nullptr);
        uint64_t last_sequence = versions_->LastSequence();
        Writer *last_writer = &w;
        if (status.ok() && updates != nullptr)
        {
            // 合并写入操作
            WriteBatch *write_batch = BuildBatchGroup(&last_writer);
            WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
            last_sequence += WriteBatchInternal::Count(write_batch);
    
            // 将更新记录到WALLog文件,并刷新到磁盘
            {
                mutex_.Unlock();
                status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
                bool sync_error = false;
                if (status.ok() && options.sync)
                {
                    status = logfile_->Sync();
                    if (!status.ok())
                    {
                        sync_error = true;
                    }
                }
                // 将更新写入到MemTable 中
                if (status.ok())
                {
                    status = WriteBatchInternal::InsertInto(write_batch, mem_);
                }
                mutex_.Lock();
                if (sync_error)
                {
                    RecordBackgroundError(status);
                }
            }
            if (write_batch == tmp_batch_)
                tmp_batch_->Clear();
    
            versions_->SetLastSequence(last_sequence);
        }
    
        // 由于合并写入操作一次可能会处理多个writers_队列中的元素,因此这里将已经处理的元素状态进行变更,并发送信号
        while (true)
        {
            Writer *ready = writers_.front();
            writers_.pop_front();
            if (ready != &w)
            {
                ready->status = status;
                ready->done = true;
                ready->cv.Signal();
            }
            if (ready == last_writer)
                break;
        }
    
        // Notify new head of write queue
        if (!writers_.empty())
        {
            writers_.front()->cv.Signal();
        }
    
        return status;
    }
    

    快照生成与读取分析

    快照保存了DB在某一个特定时刻的状态,快照一般为分为不可变的,因而它是一种线程安全且不需要同步机制的访问对象,

    LevelDB 中的快照并不是将所有的数据进行完整的物理空间备,而是保存每一个快照备份记录创建时刻的序列号。在DBImpl 类中专门有一个成员变量采用双向链表的数据结构,存储所有快照备份的序列号。

    前面已经讲过,用户是通过调用DB->GetSnapshot() 函数创建一个快照,而这个函数本质上就是调用了SnapshotList 的 New 方法,在链表中创建并插入一个新的快照节点,而新节点保存的就是当前DB的最新序列号。而对应的方法,则是调用对应的方法将该快照节点从链表中删除。

    image.png

    相关文章

      网友评论

          本文标题:LevelDB 整体架构

          本文链接:https://www.haomeiwen.com/subject/crmqektx.html