RocksDB——Flush
Flush触发流程
imageMemtableList::FlushRequested()
将memtable中请求flush的flagflush_requested_
标记为true
DBImpl::SchedulePendingFlush()
(db/db_impl_compaction_flush.cc)
将当前cfd加入PendingFlushQueue
DBImpl::MaybeScheduleFlushOrCompaction()
(db/db_impl_compaction_flush.cc)
触发flush或者compaction
首先判断是否处于特殊阶段,这些阶段直接返回:
- 打开数据库(opened_successfully_ == true):避免compaction与open的数据竞争
- 暂停后台任务(bg_work_paused_ > 0)
- 关闭数据库的时候(shutting_down_ == true)
通过Env::GetBackgroudThreads获取HIGH pri线程池中线程数
如果满足:
- HIGH pri线程池中还有线程资源;
- 未处理的flush请求(
unscheduled_flushes_
) > 0 ; - 正在处理的flush请求数(
bg_flush_scheduled_
)小于限制的最大同时处理flush数(bg_job_limits.max_flushes
)
则触发HIGH pri的flush,直到上面的条件之一不满足为止
此时后台线程执行函数BGWorkFlush
,使用HIGH pri线程池
如果HIGH pri线程池已经没有线程了,则尝试使用LOW pri线程池
如果满足:
- 未处理flush请求(
unscheduled_flushes_
) > 0 - 后台flush总数(
bg_flush_scheduled_
)以及后台compaction总数(bg_compaction_scheduled_
)之和小于最大的flush限制个数(bg_job_limits.max_flushes
)
则触发LOW pri的flush,直到上面的条件之一不满足为止
此时后台线程执行函数BGWorkFlush
,使用LOW pri线程池
如果当前没有flush待处理,则处理compaction,如果满足:
- 当前正在处理的compaction总数(
bg_compaction_scheduled_
)小于最大可运行compaction总数(bg_job_limits.max_compactions
) - 待处理compaction总数大于0(
unscheduled_compactions_
)
则触发compaction,直到上述条件不满足为止
此时后台线程执行函数BGWorkCompaction
,使用LOW pri线程池
Flush的执行过程
BGWorkFlush
(db/db_impl_compaction_flush.cc)
这个函数交给子线程执行,然后子线程可以调用BackgroundCallFlush执行真正的Flush操作
BackgroundCallFlush
(db/db_impl_compaction_flush.cc)
首先记录插入到pending_outputs_中的number(这个number是新生成table的number,处理完之后再释放)
增加正在处理的flush计数:num_running_flushes++
调用BackgroundFlush进行flush操作,传入参数:made_progress
(默认false)、job_context
、log_buffer
当发生错误返回的状态不是ok的时候处理错误
ReleaseFieNumberFromPendingOutputs(pending_outputs_inserted_elem)
从pending_outputs_中删除掉前面记录的elem,也就是当前memtable已经成功处理
FindAbsoleteFiles:处理flush失败时候的临时文件问题
num_running_flushes--;
bg_flushe_scheduled--;
至此处理完一个flush操作(无论失败还是成功),则将正在运行的flush任务计数减1,同时被安排的flush任务数减1
最后调用MaybeScheduleFlushOrCompaction,尝试触发下一次flush或者compaction
BackgroundFlush
(db/db_impl_compaction_flush.cc)
在排除所有的非正常情况下开始执行flush的准备工作,首先就是通过while循环获取一个cfd,获取cfd的函数为DBImpl::PopFirstFromFlushQueue,该函数从flush_queue_中pop出一个cfd。在确保当前cfd不是被drop或者是没有达到flush状态的cfd,则选择该cfd进行flush
通过GetLastMutableCFOptions获取当前cfd的设置信息,然后调用FlushMemtableToOutputFile,对当前cfd进行flush操作,传入参数:
cfd
mutable_cf_options
made_progress
job_context
log_buffer
Flush函数实现
FlushMemtableToOutputFile
【sequence number & snapshot 】
earlist_write_conflict_snapshot
通过snapshot_checker获取所有的snapshot
snapshot_checker
【构造FlushJob】
传入参数:
dbname_
cfd
immutable_db_options_
mutable_cf_options
env_options_for_compaction_
versions_.get()
&mutex_
&shutting_down_
snapshot_seqs
earlist_write_conflict_snapshot
snapshot_checker
job_context
log_buffer
// 获取文件路径相关
directories.GetDir()
GetDataDir(cfd, 0U)
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options)
stats_
&event_logger_
mutable_cf_options.report_bg_io_stats
FlushJob是进行Flush操作的主体结构,包含了进行Flush操作的主要的操作的封装
【flushjob.PickMemtable】从传入的cfd中选择待flush的immutable memtable,详见后文的FlushJob解析
【NotifyOnFlushBegins】:通知listener flush开始
【flushjob.run】传入空的file_meta(FileMetaData)
【InstallSuperVersionAndScheduleWork】:构建新的SuperVersion结构并触发新的flush以及compaction
【NotifyOnFlushCompleted】:通知listener flush完成
【sst_file_manager.addfile】:更新sfm中的磁盘空间使用情况
FlushJob(有test)
(db/flush_job.h, db/flush_job.cc)
API
- PickMemtable
该函数主要作用是获取需要进行flush的memtable,此处与leveldb不同的是,一个cf里可能包含多个待flush的memtable,所以这里选出来的也可能有多个memtable
获取memtbale的过程是调用cfd->imm()->PickMemtableToFlush,获取一个memtable的list
从memtable列表中获取第一个memtable,使用其edit结构来保存本次flush的元信息(lognumber、cf_id)
调用version_set的NewFileNumber接口为新的文件生成一个filenumber(同时可以指定对应level的路径)
- Run
WriteLevel0Table:写数据的过程
WriteLevel0Table
该函数为真正进行读写数据的函数,在该函数内将FlushJob中挑选出来的所有Memtable进行Merge然后构造成sstable并写到L0
首先构造一个write_hint,调用的函数接口是cfd_->CaculateSSTWriteHint,传入参数0
然后遍历所有的memtable,并获取每个memtable的iterator。在遍历的过程中同时还构造memtable的RangeTombstoneIterator,如果不为空表示有range delete操作,则将这个iterator添加到另一个vector中
使用所有的memtable的iterator构造MergingIterator,以及所有的range_del_iterator构造MergingIterator
调用BuildTable函数构造SSTable
处理完成 之后如果output_file_directory不为空则同步该目录(output_file_directory_->Fsync()
)
调用edit_->AddFile,将生成的文件添加到L0
记录本次Flush的状态
WriteHint
WriteHint是用来提示RocksDB中写数据位置的一项优化措施
首先看定义:
enum WriteLifeTimeHint {
WLTH_NOT_SET = 0, // No hint information set
WLTH_NONE, // No hints about write life time
WLTH_SHORT, // Data written has a short life time
WLTH_MEDIUM, // Data written has a medium life time
WLTH_LONG, // Data written has a long life time
WLTH_EXTREME, // Data written has an extremely long life time
};
write hint以enum的形式定义,每个选项表示对应数据的生命周期的长度
在计算SST的write hint的时候是根据sst写入的目标level实现的,该模式只适用于leveled compaction,对于L0、L1是WLTH_MEDIUM,L2是WLTH,LONG而L3及以上是WLTH_EXTREME
网友评论