前面已经写了几篇文章介绍一些和 LevelDB 相关的内容:
这篇文章,介绍一下 LevelDB 的写操作。
写操作接口
LevelDB 提供的写操作接口有:
其中,Put 和 Delete 的实现都是通过封装 Write 来实现的,函数调用关系如下:
- leveldb::DBImpl::Put => leveldb::DB::Put => leveldb::DBImpl::Write
- leveldb::DBImpl::Delete => leveldb::DB::Delete => leveldb::DBImpl::Write
Write
接口
Write 的函数原型如下:
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
leveldb::WriteOptions 是控制写操作的参数,目前只有一个成员变量 sync
表示是否每次写完都要将日志 flush 到磁盘。
leveldb::WriteBatch 表示多个 Key-Value 数据的更新操作(Put、Delete)。
具体实现是 leveldb::DBImpl::Write 。
实现
- LevelDB 通过传入的参数构造一个 Writer —— leveldb::DBImpl::Writer 对象来执行本次批量写操作:
struct DBImpl::Writer {
Status status; // 执行结果
WriteBatch* batch; // 多个更新操作
bool sync; // 是否 flush 到磁盘,WriteOptions.sync
bool done; // 是否已经执行
port::CondVar cv; // 并发控制的条件变量
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
-
获取互斥锁,将自己放入写队列后等待别人帮忙完成写入或者成为队首获得执行写入的权限。这里涉及 LevelDB 写操作的一个性能优化:执行写入操作的线程,会根据一定的规则将队列中的多个请求合并成一个请求,然后执行批量写入,并更新各个 Writer 的状态。
-
如果别人帮忙完成写入了,直接返回结果。下面开始执行写入数据。
-
调用 MakeRoomForWrite 在一个循环里面按照下面的流程进行检查,直到MemTable 的大小没有达到阈值或者出错。(MakeRoomForWrite 提供一个 force 参数表示是否强制切换新 MemTable,并触发 Compaction。正常写流程 force 为 false。)
MakeRoomForWrite.png
-
调用 BuildBatchGroup 将从队首开始的连续多个符合条件的 Writer 的写请求合并到 tmp_batch_。合并时主要考虑:
- 合并写入的数据大小,默认 max_size 是 1MB (1 << 20)。如果第一个写请求的 size 比较小(小于128 KB, 128 << 10),则 max_size 为 size + 128 KB。这样做是为了避免数据小的请求被其它请求给拖慢。
- 如果第一个写请求 sync == false,那么就不要加入 sync == true 的写请求。
-
释放互斥锁。这里代码保证同一时刻只有一个线程会执行写入操作。
-
写日志(WAL) 。
-
根据参数决定是否 sync 日志。
-
获取互斥锁 。
-
如果 sync 失败,设置 bg_error_,后续所有写入都将失败。
以上,便是 LevelDB 的写入流程。
小结
本篇文章结合代码简单介绍了 LevelDB 写操作的流程,其中,写入队列 + 合并写操作 是 LevelDB 写操作的一个设计亮点 —— 至少我个人觉得这个设计简单又实用,对写入性能的提升也应该是立竿见影的。
当然,LevelDB 的写操作也存在一些可以改进的地方,比如整个写入过程——包括写日志和写 MemTable,都是单线程的。
网友评论