主流数据库都支持 mvcc
概念,etcd 也支持。以前经常使用 etcd, 但仅限当配置或是状态通知,根本不了解细节,先看下 mvcc
如何实现的吧。
版本概念
对同一个 key 每次修改都对应 revision (翻译成版本?修订?), key 在生命周期内可能被频繁删除,从创建到删除的所有 revision 集合组成一个 generation (翻译成代?), 每个 key 周时有多个 generation 组成多版本。组织这个多版本 key 的结构体叫做 keyIndex
┌───────────────────────────────────────────────────────────────────────────┐
│ ┌───────────┬───────────┬─────────────┐ │
│ │ []byte │ revision │[]generation │ keyIndex │
│ │ key │ modified │ generations │ │
│ └───────────┴──────┬────┴──────┬──────┘ │
│ ┌────────────────────┘ │ │
│ │ │ │
│ revis▼on ▼ │
│ ┌──────┬───────┐ ┌──────────────┬──────────────┐ │
│ │ main │ sub │ │ generation0 │ generation1 │ **** │
│ └──────┴───────┘ └───────┬──────┴──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────┬────────────┬─────────┐ │
│ │ ver │ created │ revs │ generation │
│ └──────┴───────┬────┴────┬────┘ │
│ ┌────────────────┘ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┬──────────────┐ │
│ │ revision │ │ revision │ revision │ **** │
│ └──────────────┘ └──────────────┴──────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
先看下最重要的 revision
结构体,main 表示当前操作的事务 id,全局自增的逻辑时间戳,sub 表示当前操作在事务内部的子 id,事务内自增,从 0 开始。比如一个事务内: put key value1, delete key, 那么分别对应 {txid, 0}, {txid, 1}
// generation contains multiple revisions of a key.
type generation struct {
ver int64 // generation 期间自增
created revision // when the generation is created (put in first revision).
revs []revision
}
mvcc
里不可能让所有版本都追加到一个 revs 数组中,会不限彭胀,所以产生了 generation
代的概念,ver
表示代内操作的顺序,从 0 自增,created
代表创建这个 generation
的第一个版本,最后 revs
表示所有版本。
type keyIndex struct {
key []byte // 用户 key 名称
modified revision // the main rev of the last modification 最新修改的版本
generations []generation // 不同代的数组
}
key
代表当前操作的用户 key, modified
代表最新修改的版本,generations
是代的数组,generations[n-1] 代表最新操作的代。
版本存储
对于数据存储分两点:
- 版本信息: 由 key 和 revision 组成的版本信息,存储在内存的 btree 中,用于快速查找
- kv数据: 真实的 kv 数据存放在 boltdb 中,key 是 revision, value 是序列化后的 pb
btree
版本存储在 treeIndex
中,实际上就是一个 btree,和 mysql 的比较像,点查和范围查都非常快。
type treeIndex struct {
sync.RWMutex
tree *btree.BTree
lg *zap.Logger
}
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(ti.lg, rev.main, rev.sub)
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
keyi := &keyIndex{key: key}
ti.RLock()
defer ti.RUnlock()
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
return keyi.get(ti.lg, atRev)
}
func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
ti.RLock()
defer ti.RUnlock()
return ti.keyIndex(keyi)
}
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
if item := ti.tree.Get(keyi); item != nil {
return item.(*keyIndex)
}
return nil
}
可以看到操作比较简单,没什么难度
boltdb
Btree 的版本信息,只包括用户真实 key 和 revision,底层 backend 才是真正保存 kv 的地方,但这个 kv 并不是用户的,而分别是 revision 和 kv pb,参考 storeTxnWrite
的实现
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
......
}
可以看到 key 是 ibytes
也就是 revision
, value 是 mvccpb.KeyValue
,里面存放当前操作的用户 kv,revision, lease 信息。到此,对 etcd mvcc 信息有一定的了解了,内存 btree 保存 key 的版本信息,而 boltdb
存打平的 mvccpb.KeyValue
完整流程
etcd 代码有点绕,接口太多。整体来讲 mvcc 要分几方面:增删改查,一致性,过期数据清理。先看完整读写,整体入口接口是 applierV3
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, lease.ErrLeaseNotFound
}
}
txn = a.s.KV().Write()
defer txn.End()
}
var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
val = rr.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
}
resp.Header.Revision = txn.Put(p.Key, val, leaseID)
return resp, nil
}
先不管 lease
, 实际上就是调用 txn.Put(p.Key, val, leaseID)
, 而 txn 是由 txn = a.s.KV().Write()
,这里面有点绕,实际上是 kvstore_txn
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("unexpected error from lease detach: %v", err)
}
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
}
其实这就是上面讲过的代码
-
rev := tw.beginRev + 1
版本 id 就是当前 +1,开启新的事务 -
kvindex.Get
查看是否有当前 key 的 index - 生成
ibytes
数据,这里看到 main 就是当前 rev, sub 就是本次事务的更改条目 - 生成
mvccpb.KeyValue
信息,并调用UnsafeSeqPut
写入boltdb
,调用kvindex.Put
写内存 btree - 最后关于
lease
的先不管。
这里面一定要注意 UnsafeSeqPut
只是开启了这个务,但是并没有提交,底层利用了 boltdb.batch
原理。 所以什么时候 commit 呢?实际上这是异步的,要追遡到 boltdb
初始化。
func newBackend(bcfg BackendConfig) *backend {
bopts := &bolt.Options{}
if boltOpenOptions != nil {
*bopts = *boltOpenOptions
}
bopts.InitialMmapSize = bcfg.mmapSize()
bopts.FreelistType = bcfg.BackendFreelistType
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil {
if bcfg.Logger != nil {
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
} else {
plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
}
}
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend{
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
lg: bcfg.Logger,
}
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
}
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
创建 batchTx
后,马上调用 go b.run()
开启异步 goroutine,用于定期 commit 数据,时间间隔 batchInterval
默认 100ms, 也就是说 mvcc 写入不是那么严格,有一定时间窗口的。
过期数据清理
首先是正常的数据删除,applierV3
主动调用 DeleteRange
,最终还是操作的 storeTxnWrite
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev++
}
keys, _ := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for _, key := range keys {
tw.delete(key)
}
return int64(len(keys))
}
首先从 kvindex
也就是 btree
中范围查找到所有的 keys,然后再调用 tw.delete
依次删除
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
} else {
// TODO: remove this in v3.5
ibytes = appendMarkTombstone(nil, ibytes)
}
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
)
} else {
plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
}
}
tw.changes = append(tw.changes, kv)
item := lease.LeaseItem{Key: string(key)}
leaseID := tw.s.le.GetLease(item)
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to detach old lease from a key",
zap.Error(err),
)
} else {
plog.Errorf("cannot detach %v", err)
}
}
}
}
- 生成 ibytes, 然后追加一个
appendMarkTombstone
标记,表示这个 revision 是 delete,并且生成一个只含有 key 的mvccpb.KeyValue
-
UnsafeSeqPut
将本次删除 revision 写到boltdb
- btree
调用
Tombstone`,增加一个 tombstone revision -
lease
操作,暂时忽略不看。
到此看到的删除都是惰性删除,只是标记,那物理数据何时 purge 呢?实际上 etcd 会开启后台 goroutine 自动 compaction, 或是手工用 cli 方式触发。查看代码,其实也只是删 btree
,如果 keyi.compact
压缩后发现数据为空,调用 ti.tree.Delete(keyi)
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()
clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
ti.Unlock()
return true
})
return available
}
那么磁盘的 boltdb
数据什么时候删呢?其实是不删的,每次 Compact
时 scheduleCompaction
调用 boltdb
的删除函数,但是我们知道 boltdb
只是标记而己不释放磁盘空间。当然最后还是有碎片整理的,也就是 Defrag
, 看代码也好理解,就是新建 boltdb
底层文件,然后全量导数据,最后 rename 替换。
网友评论