美文网首页
RocksDB——Flush

RocksDB——Flush

作者: Glitter试做一号机 | 来源:发表于2018-12-03 16:54 被阅读0次

    RocksDB——Flush

    Flush触发流程

    image

    MemtableList::FlushRequested()

    将memtable中请求flush的flagflush_requested_标记为true

    DBImpl::SchedulePendingFlush()

    db/db_impl_compaction_flush.cc

    将当前cfd加入PendingFlushQueue

    DBImpl::MaybeScheduleFlushOrCompaction()

    db/db_impl_compaction_flush.cc

    触发flush或者compaction

    首先判断是否处于特殊阶段,这些阶段直接返回:

    • 打开数据库(opened_successfully_ == true):避免compaction与open的数据竞争
    • 暂停后台任务(bg_work_paused_ > 0)
    • 关闭数据库的时候(shutting_down_ == true)

    通过Env::GetBackgroudThreads获取HIGH pri线程池中线程数

    如果满足:

    1. HIGH pri线程池中还有线程资源;
    2. 未处理的flush请求(unscheduled_flushes_) > 0 ;
    3. 正在处理的flush请求数(bg_flush_scheduled_)小于限制的最大同时处理flush数(bg_job_limits.max_flushes)

    则触发HIGH pri的flush,直到上面的条件之一不满足为止

    此时后台线程执行函数BGWorkFlush,使用HIGH pri线程池

    如果HIGH pri线程池已经没有线程了,则尝试使用LOW pri线程池

    如果满足:

    1. 未处理flush请求(unscheduled_flushes_) > 0
    2. 后台flush总数(bg_flush_scheduled_)以及后台compaction总数(bg_compaction_scheduled_)之和小于最大的flush限制个数(bg_job_limits.max_flushes)

    则触发LOW pri的flush,直到上面的条件之一不满足为止

    此时后台线程执行函数BGWorkFlush,使用LOW pri线程池

    如果当前没有flush待处理,则处理compaction,如果满足:

    1. 当前正在处理的compaction总数(bg_compaction_scheduled_)小于最大可运行compaction总数(bg_job_limits.max_compactions)
    2. 待处理compaction总数大于0(unscheduled_compactions_)

    则触发compaction,直到上述条件不满足为止

    此时后台线程执行函数BGWorkCompaction,使用LOW pri线程池

    Flush的执行过程

    BGWorkFlush

    db/db_impl_compaction_flush.cc

    这个函数交给子线程执行,然后子线程可以调用BackgroundCallFlush执行真正的Flush操作

    BackgroundCallFlush

    db/db_impl_compaction_flush.cc

    首先记录插入到pending_outputs_中的number(这个number是新生成table的number,处理完之后再释放)

    增加正在处理的flush计数:num_running_flushes++

    调用BackgroundFlush进行flush操作,传入参数:made_progress(默认false)、job_contextlog_buffer

    当发生错误返回的状态不是ok的时候处理错误

    ReleaseFieNumberFromPendingOutputs(pending_outputs_inserted_elem)从pending_outputs_中删除掉前面记录的elem,也就是当前memtable已经成功处理

    FindAbsoleteFiles:处理flush失败时候的临时文件问题

    num_running_flushes--;
    
    bg_flushe_scheduled--;
    

    至此处理完一个flush操作(无论失败还是成功),则将正在运行的flush任务计数减1,同时被安排的flush任务数减1

    最后调用MaybeScheduleFlushOrCompaction,尝试触发下一次flush或者compaction

    BackgroundFlush

    db/db_impl_compaction_flush.cc

    在排除所有的非正常情况下开始执行flush的准备工作,首先就是通过while循环获取一个cfd,获取cfd的函数为DBImpl::PopFirstFromFlushQueue,该函数从flush_queue_中pop出一个cfd。在确保当前cfd不是被drop或者是没有达到flush状态的cfd,则选择该cfd进行flush

    通过GetLastMutableCFOptions获取当前cfd的设置信息,然后调用FlushMemtableToOutputFile,对当前cfd进行flush操作,传入参数:

    cfd
    mutable_cf_options
    made_progress
    job_context
    log_buffer
    

    Flush函数实现

    FlushMemtableToOutputFile

    【sequence number & snapshot 】

    earlist_write_conflict_snapshot

    通过snapshot_checker获取所有的snapshot

    snapshot_checker

    【构造FlushJob】

    传入参数:

    dbname_
    cfd
    immutable_db_options_
    mutable_cf_options
    env_options_for_compaction_
    versions_.get()
    &mutex_
    &shutting_down_
    snapshot_seqs
    earlist_write_conflict_snapshot
    snapshot_checker
    job_context
    log_buffer
    
    // 获取文件路径相关
    directories.GetDir()
    GetDataDir(cfd, 0U)
        
    GetCompressionFlush(*cfd->ioptions(), mutable_cf_options)
    stats_
    &event_logger_
    mutable_cf_options.report_bg_io_stats
    

    FlushJob是进行Flush操作的主体结构,包含了进行Flush操作的主要的操作的封装

    【flushjob.PickMemtable】从传入的cfd中选择待flush的immutable memtable,详见后文的FlushJob解析

    【NotifyOnFlushBegins】:通知listener flush开始

    【flushjob.run】传入空的file_meta(FileMetaData)

    【InstallSuperVersionAndScheduleWork】:构建新的SuperVersion结构并触发新的flush以及compaction

    【NotifyOnFlushCompleted】:通知listener flush完成

    【sst_file_manager.addfile】:更新sfm中的磁盘空间使用情况

    FlushJob(有test)

    (db/flush_job.h, db/flush_job.cc)

    API
    • PickMemtable

    该函数主要作用是获取需要进行flush的memtable,此处与leveldb不同的是,一个cf里可能包含多个待flush的memtable,所以这里选出来的也可能有多个memtable

    获取memtbale的过程是调用cfd->imm()->PickMemtableToFlush,获取一个memtable的list

    从memtable列表中获取第一个memtable,使用其edit结构来保存本次flush的元信息(lognumber、cf_id)

    调用version_set的NewFileNumber接口为新的文件生成一个filenumber(同时可以指定对应level的路径)

    • Run

    WriteLevel0Table:写数据的过程

    WriteLevel0Table

    该函数为真正进行读写数据的函数,在该函数内将FlushJob中挑选出来的所有Memtable进行Merge然后构造成sstable并写到L0

    首先构造一个write_hint,调用的函数接口是cfd_->CaculateSSTWriteHint,传入参数0

    然后遍历所有的memtable,并获取每个memtable的iterator。在遍历的过程中同时还构造memtable的RangeTombstoneIterator,如果不为空表示有range delete操作,则将这个iterator添加到另一个vector中

    使用所有的memtable的iterator构造MergingIterator,以及所有的range_del_iterator构造MergingIterator

    调用BuildTable函数构造SSTable

    处理完成 之后如果output_file_directory不为空则同步该目录(output_file_directory_->Fsync())

    调用edit_->AddFile,将生成的文件添加到L0

    记录本次Flush的状态

    WriteHint

    WriteHint是用来提示RocksDB中写数据位置的一项优化措施

    首先看定义:

     enum WriteLifeTimeHint {
        WLTH_NOT_SET = 0, // No hint information set
        WLTH_NONE,        // No hints about write life time
        WLTH_SHORT,       // Data written has a short life time
        WLTH_MEDIUM,      // Data written has a medium life time
        WLTH_LONG,        // Data written has a long life time
        WLTH_EXTREME,     // Data written has an extremely long life time
      };
    

    write hint以enum的形式定义,每个选项表示对应数据的生命周期的长度

    在计算SST的write hint的时候是根据sst写入的目标level实现的,该模式只适用于leveled compaction,对于L0、L1是WLTH_MEDIUM,L2是WLTH,LONG而L3及以上是WLTH_EXTREME

    相关文章

      网友评论

          本文标题:RocksDB——Flush

          本文链接:https://www.haomeiwen.com/subject/sxtucqtx.html