# rocksdb engine 写逻辑
## 执行路径
DB::Put(key, value)是一个写操作简单封装, 最终都会打包一个WriteBatch对象,调用rocksdb::DBImpl::WriteImpl来完成写。
也可以手工构造一个WriteBatch,作为一个批量事务操作,放入多个key/value操作,一次提交。
```cpp
#0 rocksdb::MemTable::Add (this=0x1619800, s=1, type=rocksdb::kTypeValue, key=..., value=..., allow_concurrent=false, post_process_info=0x0) at db/memtable.cc:425
#1 0x00000000005c25c2 in rocksdb::MemTableInserter::PutCF (this=0x7fffffffa8b0, column_family_id=0, key=..., value=...) at db/write_batch.cc:869
#2 0x00000000005bdfe7 in rocksdb::WriteBatch::Iterate (this=0x7fffffffb0c0, handler=0x7fffffffa8b0) at db/write_batch.cc:381
#3 0x00000000005bfa41 in rocksdb::WriteBatchInternal::InsertInto (writers=..., sequence=1, memtables=0x1616100, flush_scheduler=0x1608808, ignore_missing_column_families=false, log_number=0, db=0x1608000,
concurrent_memtable_writes=false) at db/write_batch.cc:1206
#4 0x00000000004c85ae in rocksdb::DBImpl::WriteImpl (this=0x1608000, write_options=..., my_batch=0x7fffffffb0c0, callback=0x0, log_used=0x0, log_ref=0, disable_memtable=false) at db/db_impl.cc:4953
#5 0x00000000004c6b72 in rocksdb::DBImpl::Write (this=0x1608000, write_options=..., my_batch=0x7fffffffb0c0) at db/db_impl.cc:4585
#6 0x00000000004ccd99 in rocksdb::DB::Put (this=0x1608000, opt=..., column_family=0x15d6500, key=..., value=...) at db/db_impl.cc:5803
#7 0x00000000004c69f0 in rocksdb::DBImpl::Put (this=0x1608000, o=..., column_family=0x15d6500, key=..., val=...) at db/db_impl.cc:4560
```
## WriteBatch
一个WriteBatch就是一个事务,里面会有很多条操作记录,可以调用WriteBatch.Put/Delete...等操作加入操作(Key/Value)
WriteBatch.rep_ 是一个binary buffer, 用于存储batch中所有操作的记录,格式如下:
WriteBatch.content\_flags_ 标记batch中含有的操作类型集合。
field | length | description |
---------:| :----- |:-----
kHeader | FixInt64 | 序列号,单调递增, Batch sequence的起始值
Count | FixInt32 | 操作记录个数
Type | FixInt8 + Var32Int | 操作类型 + column_family(if != 0)
Key | Var32String | key binary buffer
Value | Var32String | value binary buffer
Type/Key/Value每个记录重复一条, kHeader/Count batch对象共用
## rocksdb::DBImpl::WriteImpl
- 新建一个WriteThread::Writer对象,关联到传入的batch object.
- 调用write\_thread_.JoinBatchGroup(&w);
### Group Commit
为了提高commit性能,存储引擎会将很多线程的并发write合并到一个group,批量写日志,write memory table,然后一次性commit.
JoinBatchGroup调用LinkOne(w, &linked_as_leader);将当前write\_thread_中的writer连接成一个链表,其中write\_thread_.newest\_writer_是链表的头,是最新加入的follower,而第一个加入链表的也就是当前group的leader(link_older=nullptr).
follower(newest_writer) --*link_older*--> follower --*link_older*--> follower --*link_older*--> **leader** ----> nullptr
如果当前的Writer成为了Leader,那么返回做剩下的提交逻辑,如果当前已经有了Leader,需要等待Write.state成为STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED
- As Leader
- 检查是否需要Flush, 如果需要,找出所有column_family中最大的MemTable的CF,调用SwitchMemtable,冻结当前active memtable, 调用SchedulePendingFlush调度刷盘。
- 取当前的versions_的LastSequence。 开始持有DBImpl::mutex
- 调用write\_thread_.EnterAsBatchGroupLeader,这个函数确定当前提交的批次应该包含哪些数据
- 计算当前可以批量提交的最大长度max_size; 如果leader.size<128KB max_size=leader.size+128KB,如果>128KB,max_size=1MB.
- 调用CreateMissingNewerLinks(newest_writer),将整个链表的反向链接建立起来(link_newer),成了一个双向链表。
- 从leader开始反向遍历,一直到newest_writer, 累加每个batch的size,一直到max_size,超过之后截断,同时检查每个writer的sync/no_slowdown/disableWAL是否一致,不一致的地方也开始截断,用last_writer标记链表的结束位置,作为函数输出参数返回。w->callback->AllowWriteBatching(),也可以设置不想被batch的writer. 并且把符合条件的writer batch都push到write_group的vector中。
- 检查是否可以parallel提交,条件有几个:1. memtable支持,2.allow_concurrent_memtable_write设置,3.write_group 有多个batch, 4. batch中没有merge操作。
- 确定当前提交的group的current_sequence=last_sequence+1(作为起始sequence), 并且将sequence先进行占位,一次性**为write_group中每个batch的每个操作记录都分配一个sequence**. (注意writer.ShouldWriteToMemtable标记为false的不计入sequence)
- 将write_group中的每个batch的数据都append到一个新的WriteBatch对象merged_batch中(tmp\_batch_),如果group中只有一个batch, 那么就用这个batch,没必要拷贝数据了。
- 设置新的merged_batch对象的sequence为current_sequence(起始sequence)
- 写WAL,用的数据就是merged\_batch中的rep_ (如果是多个batch,那么tmp\_batch_可以清理了)
- 如果不允许并发:串行执行write memtable,调用WriteBatchInternal::InsertInto将write_group中所有数据串行写入到memtable.
- 并行执行(concurrent_memtable_writes)
- WriteThread::ParallelGroup 建立一个并行写memtable group,pg.leader/last_writer分别指向链表的头和尾。
- write\_thread_.LaunchParallelFollowers: 设置链表中每个writer.sequence为之前分配好的sequence,(注意每个batch分配自己的一段,调用InsertInto的时候再每个Key设置自己的sequence),设置writer.state为STATE_PARALLEL_FOLLOWER
- 作为Leader的writer batch开始写memtable
- 调用write\_thread_.CompleteParallelWorker(&w)判断是不是最后一个完成write memtable的线程
- 正常情况下(如果early_exit_allowed为false),只有leader会最后做收尾工作,因此即便leader不是最后一个写完memtable的线程,也会等待writer.state == STATE\_COMPLETED 才会退出, 并返回true,表示需要做最后的提交工作(update versions_.LastSequence),leader的STATE_COMPLETEDe是由最后一个退出的follower线程设置的。
- follower线程如果不是最后一个完成工作的线程,那么会一直等到writer.state == STATE_COMPLETED退出。
- follower线程如果是最后一个完成工作的线程,那么会先把group.leader.state设置为STATE_COMPLETED,然后等待自己变成STATE_COMPLETED,退出。
- 如果 CompleteParallelWorker返回了true(leader等到了STATE\_COMPLETED或者自己就是最后一个),做提交动作更新全局的sequence: versions_->SetLastSequence(last_sequence);
- 调用write\_thread_.ExitAsBatchGroupLeader
- 注意当前的newest\_writer_可能已经加了很多新的write batch进来了,在上一次commit的过程中,新进来的write batch还会一直往write\_thread_的链表上挂,但是本次提交的截止的点在EnterAsBatchGroupLeader时候确定的,因此退出的时候会将本次提交链还剩余的writer链,重新建立好反向链接,设置紧接着的writer为新的Leader,之前调用JoinBatchGroup等待的线程,又可以继续执行下一批事务。
- 遍历write_group中所有的writer, 设置所有writer的state为STATE_COMPLETED,这样还在调用CompleteParallelWorker的follower线程就会退出。
- As follower
- JoinBatchGroup之后如果没有成为Leader,那就等着Leader线程在LaunchParallelFollowers的时候把自己设置为follower状态,一旦设置完,就进入follower写memtable逻辑,最后判断CompleteParallelWorker是否可以退出,一般是等待所有的batch都干完活以后退出。
- 如果allow_concurrent_memtable_write没有打开, follower线程会一直等待Leader干完所有的事情,最后调用ExitAsBatchGroupLeader设置状态为STATE_COMPLETED后直接退出。
### writer链表在group commit过程中的变化
![link_list](rocksdb_writer_link_list.png)
### writer对象状态变迁图
![state_flow](rocksdb_writer_state_flow.png)
网友评论