美文网首页
MVCC在etcd中的实现

MVCC在etcd中的实现

作者: 董泽润 | 来源:发表于2019-07-15 18:04 被阅读0次

    主流数据库都支持 mvcc 概念,etcd 也支持。以前经常使用 etcd, 但仅限当配置或是状态通知,根本不了解细节,先看下 mvcc 如何实现的吧。

    版本概念

    对同一个 key 每次修改都对应 revision (翻译成版本?修订?), key 在生命周期内可能被频繁删除,从创建到删除的所有 revision 集合组成一个 generation (翻译成代?), 每个 key 周时有多个 generation 组成多版本。组织这个多版本 key 的结构体叫做 keyIndex

    ┌───────────────────────────────────────────────────────────────────────────┐
    │              ┌───────────┬───────────┬─────────────┐                      │
    │              │  []byte   │  revision │[]generation │ keyIndex             │
    │              │   key     │  modified │ generations │                      │
    │              └───────────┴──────┬────┴──────┬──────┘                      │
    │            ┌────────────────────┘           │                             │
    │            │                                │                             │
    │       revis▼on                              ▼                             │
    │    ┌──────┬───────┐              ┌──────────────┬──────────────┐          │
    │    │ main │  sub  │              │ generation0  │ generation1  │ ****     │
    │    └──────┴───────┘              └───────┬──────┴──────────────┘          │
    │                                          │                                │
    │                                          ▼                                │
    │                       ┌──────┬────────────┬─────────┐                     │
    │                       │ ver  │  created   │  revs   │ generation          │
    │                       └──────┴───────┬────┴────┬────┘                     │
    │                     ┌────────────────┘         │                          │
    │                     ▼                          ▼                          │
    │             ┌──────────────┐   ┌──────────────┬──────────────┐            │
    │             │  revision    │   │  revision    │  revision    │ ****       │
    │             └──────────────┘   └──────────────┴──────────────┘            │
    └───────────────────────────────────────────────────────────────────────────┘
    
    type revision struct {
        // main is the main revision of a set of changes that happen atomically.
        main int64
        // sub is the the sub revision of a change in a set of changes that happen
        // atomically. Each change has different increasing sub revision in that
        // set.
        sub int64
    }
    

    先看下最重要的 revision 结构体,main 表示当前操作的事务 id,全局自增的逻辑时间戳,sub 表示当前操作在事务内部的子 id,事务内自增,从 0 开始。比如一个事务内: put key value1, delete key, 那么分别对应 {txid, 0}, {txid, 1}

    // generation contains multiple revisions of a key.
    type generation struct {
        ver     int64 // generation 期间自增
        created revision // when the generation is created (put in first revision).
        revs    []revision
    }
    

    mvcc 里不可能让所有版本都追加到一个 revs 数组中,会不限彭胀,所以产生了 generation 代的概念,ver 表示代内操作的顺序,从 0 自增,created 代表创建这个 generation 的第一个版本,最后 revs 表示所有版本。

    type keyIndex struct {
        key         []byte // 用户 key 名称
        modified    revision // the main rev of the last modification 最新修改的版本
        generations []generation // 不同代的数组
    }
    

    key 代表当前操作的用户 key, modified 代表最新修改的版本,generations 是代的数组,generations[n-1] 代表最新操作的代。

    版本存储

    对于数据存储分两点:

    1. 版本信息: 由 key 和 revision 组成的版本信息,存储在内存的 btree 中,用于快速查找
    2. kv数据: 真实的 kv 数据存放在 boltdb 中,key 是 revision, value 是序列化后的 pb

    btree

    版本存储在 treeIndex 中,实际上就是一个 btree,和 mysql 的比较像,点查和范围查都非常快。

    type treeIndex struct {
        sync.RWMutex
        tree *btree.BTree
        lg   *zap.Logger
    }
    
    func (ti *treeIndex) Put(key []byte, rev revision) {
        keyi := &keyIndex{key: key}
    
        ti.Lock()
        defer ti.Unlock()
        item := ti.tree.Get(keyi)
        if item == nil {
            keyi.put(ti.lg, rev.main, rev.sub)
            ti.tree.ReplaceOrInsert(keyi)
            return
        }
        okeyi := item.(*keyIndex)
        okeyi.put(ti.lg, rev.main, rev.sub)
    func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
        keyi := &keyIndex{key: key}
        ti.RLock()
        defer ti.RUnlock()
        if keyi = ti.keyIndex(keyi); keyi == nil {
            return revision{}, revision{}, 0, ErrRevisionNotFound
        }
        return keyi.get(ti.lg, atRev)
    }
    
    func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
        ti.RLock()
        defer ti.RUnlock()
        return ti.keyIndex(keyi)
    }
    
    func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
        if item := ti.tree.Get(keyi); item != nil {
            return item.(*keyIndex)
        }
        return nil
    }
    

    可以看到操作比较简单,没什么难度

    boltdb

    Btree 的版本信息,只包括用户真实 key 和 revision,底层 backend 才是真正保存 kv 的地方,但这个 kv 并不是用户的,而分别是 revision 和 kv pb,参考 storeTxnWrite 的实现

    func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
        rev := tw.beginRev + 1
        c := rev
        oldLease := lease.NoLease
    
        // if the key exists before, use its previous created and
        // get its previous leaseID
        _, created, ver, err := tw.s.kvindex.Get(key, rev)
        if err == nil {
            c = created.main
            oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
        }
    
        ibytes := newRevBytes()
        idxRev := revision{main: rev, sub: int64(len(tw.changes))}
        revToBytes(idxRev, ibytes)
    
        ver = ver + 1
        kv := mvccpb.KeyValue{
            Key:            key,
            Value:          value,
            CreateRevision: c,
            ModRevision:    rev,
            Version:        ver,
            Lease:          int64(leaseID),
        }
    
        d, err := kv.Marshal()
        if err != nil {
            if tw.storeTxnRead.s.lg != nil {
                tw.storeTxnRead.s.lg.Fatal(
                    "failed to marshal mvccpb.KeyValue",
                    zap.Error(err),
                )
            } else {
                plog.Fatalf("cannot marshal event: %v", err)
            }
        }
    
        tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
        tw.s.kvindex.Put(key, idxRev)
        tw.changes = append(tw.changes, kv)
         ......
    }
    

    可以看到 key 是 ibytes 也就是 revision, value 是 mvccpb.KeyValue,里面存放当前操作的用户 kv,revision, lease 信息。到此,对 etcd mvcc 信息有一定的了解了,内存 btree 保存 key 的版本信息,而 boltdb 存打平的 mvccpb.KeyValue

    完整流程

    etcd 代码有点绕,接口太多。整体来讲 mvcc 要分几方面:增删改查,一致性,过期数据清理。先看完整读写,整体入口接口是 applierV3

    func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
        resp = &pb.PutResponse{}
        resp.Header = &pb.ResponseHeader{}
    
        val, leaseID := p.Value, lease.LeaseID(p.Lease)
        if txn == nil {
            if leaseID != lease.NoLease {
                if l := a.s.lessor.Lookup(leaseID); l == nil {
                    return nil, lease.ErrLeaseNotFound
                }
            }
            txn = a.s.KV().Write()
            defer txn.End()
        }
    
        var rr *mvcc.RangeResult
        if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
            rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
            if err != nil {
                return nil, err
            }
        }
        if p.IgnoreValue || p.IgnoreLease {
            if rr == nil || len(rr.KVs) == 0 {
                // ignore_{lease,value} flag expects previous key-value pair
                return nil, ErrKeyNotFound
            }
        }
        if p.IgnoreValue {
            val = rr.KVs[0].Value
        }
        if p.IgnoreLease {
            leaseID = lease.LeaseID(rr.KVs[0].Lease)
        }
        if p.PrevKv {
            if rr != nil && len(rr.KVs) != 0 {
                resp.PrevKv = &rr.KVs[0]
            }
        }
    
        resp.Header.Revision = txn.Put(p.Key, val, leaseID)
        return resp, nil
    }
    

    先不管 lease, 实际上就是调用 txn.Put(p.Key, val, leaseID), 而 txn 是由 txn = a.s.KV().Write(),这里面有点绕,实际上是 kvstore_txn

    func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
        rev := tw.beginRev + 1
        c := rev
        oldLease := lease.NoLease
    
        // if the key exists before, use its previous created and
        // get its previous leaseID
        _, created, ver, err := tw.s.kvindex.Get(key, rev)
        if err == nil {
            c = created.main
            oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
        }
    
        ibytes := newRevBytes()
        idxRev := revision{main: rev, sub: int64(len(tw.changes))}
        revToBytes(idxRev, ibytes)
    
        ver = ver + 1
        kv := mvccpb.KeyValue{
            Key:            key,
            Value:          value,
            CreateRevision: c,
            ModRevision:    rev,
            Version:        ver,
            Lease:          int64(leaseID),
        }
    
        d, err := kv.Marshal()
        if err != nil {
            if tw.storeTxnRead.s.lg != nil {
                tw.storeTxnRead.s.lg.Fatal(
                    "failed to marshal mvccpb.KeyValue",
                    zap.Error(err),
                )
            } else {
                plog.Fatalf("cannot marshal event: %v", err)
            }
        }
    
        tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
        tw.s.kvindex.Put(key, idxRev)
        tw.changes = append(tw.changes, kv)
    
        if oldLease != lease.NoLease {
            if tw.s.le == nil {
                panic("no lessor to detach lease")
            }
            err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
            if err != nil {
                if tw.storeTxnRead.s.lg != nil {
                    tw.storeTxnRead.s.lg.Fatal(
                        "failed to detach old lease from a key",
                        zap.Error(err),
                    )
                } else {
                    plog.Errorf("unexpected error from lease detach: %v", err)
                }
            }
        }
        if leaseID != lease.NoLease {
            if tw.s.le == nil {
                panic("no lessor to attach lease")
            }
            err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
            if err != nil {
                panic("unexpected error from lease Attach")
            }
        }
    }
    

    其实这就是上面讲过的代码

    1. rev := tw.beginRev + 1 版本 id 就是当前 +1,开启新的事务
    2. kvindex.Get 查看是否有当前 key 的 index
    3. 生成 ibytes 数据,这里看到 main 就是当前 rev, sub 就是本次事务的更改条目
    4. 生成 mvccpb.KeyValue 信息,并调用 UnsafeSeqPut 写入 boltdb,调用 kvindex.Put 写内存 btree
    5. 最后关于 lease 的先不管。

    这里面一定要注意 UnsafeSeqPut 只是开启了这个务,但是并没有提交,底层利用了 boltdb.batch 原理。 所以什么时候 commit 呢?实际上这是异步的,要追遡到 boltdb 初始化。

    func newBackend(bcfg BackendConfig) *backend {
        bopts := &bolt.Options{}
        if boltOpenOptions != nil {
            *bopts = *boltOpenOptions
        }
        bopts.InitialMmapSize = bcfg.mmapSize()
        bopts.FreelistType = bcfg.BackendFreelistType
    
        db, err := bolt.Open(bcfg.Path, 0600, bopts)
        if err != nil {
            if bcfg.Logger != nil {
                bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
            } else {
                plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
            }
        }
    
        // In future, may want to make buffering optional for low-concurrency systems
        // or dynamically swap between buffered/non-buffered depending on workload.
        b := &backend{
            db: db,
    
            batchInterval: bcfg.BatchInterval,
            batchLimit:    bcfg.BatchLimit,
    
            readTx: &readTx{
                buf: txReadBuffer{
                    txBuffer: txBuffer{make(map[string]*bucketBuffer)},
                },
                buckets: make(map[string]*bolt.Bucket),
            },
    
            stopc: make(chan struct{}),
            donec: make(chan struct{}),
    
            lg: bcfg.Logger,
        }
        b.batchTx = newBatchTxBuffered(b)
        go b.run()
        return b
    }
    
    func (b *backend) run() {
        defer close(b.donec)
        t := time.NewTimer(b.batchInterval)
        defer t.Stop()
        for {
            select {
            case <-t.C:
            case <-b.stopc:
                b.batchTx.CommitAndStop()
                return
            }
            if b.batchTx.safePending() != 0 {
                b.batchTx.Commit()
            }
            t.Reset(b.batchInterval)
        }
    }
    

    创建 batchTx 后,马上调用 go b.run() 开启异步 goroutine,用于定期 commit 数据,时间间隔 batchInterval 默认 100ms, 也就是说 mvcc 写入不是那么严格,有一定时间窗口的。

    过期数据清理

    首先是正常的数据删除,applierV3 主动调用 DeleteRange,最终还是操作的 storeTxnWrite

    func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
        rrev := tw.beginRev
        if len(tw.changes) > 0 {
            rrev++
        }
        keys, _ := tw.s.kvindex.Range(key, end, rrev)
        if len(keys) == 0 {
            return 0
        }
        for _, key := range keys {
            tw.delete(key)
        }
        return int64(len(keys))
    }
    

    首先从 kvindex 也就是 btree 中范围查找到所有的 keys,然后再调用 tw.delete 依次删除

    func (tw *storeTxnWrite) delete(key []byte) {
        ibytes := newRevBytes()
        idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
        revToBytes(idxRev, ibytes)
    
        if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
            ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
        } else {
            // TODO: remove this in v3.5
            ibytes = appendMarkTombstone(nil, ibytes)
        }
    
        kv := mvccpb.KeyValue{Key: key}
    
        d, err := kv.Marshal()
        if err != nil {
            if tw.storeTxnRead.s.lg != nil {
                tw.storeTxnRead.s.lg.Fatal(
                    "failed to marshal mvccpb.KeyValue",
                    zap.Error(err),
                )
            } else {
                plog.Fatalf("cannot marshal event: %v", err)
            }
        }
    
        tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
        err = tw.s.kvindex.Tombstone(key, idxRev)
        if err != nil {
            if tw.storeTxnRead.s.lg != nil {
                tw.storeTxnRead.s.lg.Fatal(
                    "failed to tombstone an existing key",
                    zap.String("key", string(key)),
                    zap.Error(err),
                )
            } else {
                plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
            }
        }
        tw.changes = append(tw.changes, kv)
    
        item := lease.LeaseItem{Key: string(key)}
        leaseID := tw.s.le.GetLease(item)
    
        if leaseID != lease.NoLease {
            err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
            if err != nil {
                if tw.storeTxnRead.s.lg != nil {
                    tw.storeTxnRead.s.lg.Fatal(
                        "failed to detach old lease from a key",
                        zap.Error(err),
                    )
                } else {
                    plog.Errorf("cannot detach %v", err)
                }
            }
        }
    }
    
    1. 生成 ibytes, 然后追加一个 appendMarkTombstone 标记,表示这个 revision 是 delete,并且生成一个只含有 key 的 mvccpb.KeyValue
    2. UnsafeSeqPut 将本次删除 revision 写到 boltdb
    3. btree调用Tombstone`,增加一个 tombstone revision
    4. lease 操作,暂时忽略不看。

    到此看到的删除都是惰性删除,只是标记,那物理数据何时 purge 呢?实际上 etcd 会开启后台 goroutine 自动 compaction, 或是手工用 cli 方式触发。查看代码,其实也只是删 btree,如果 keyi.compact 压缩后发现数据为空,调用 ti.tree.Delete(keyi)

    func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
        available := make(map[revision]struct{})
        if ti.lg != nil {
            ti.lg.Info("compact tree index", zap.Int64("revision", rev))
        } else {
            plog.Printf("store.index: compact %d", rev)
        }
        ti.Lock()
        clone := ti.tree.Clone()
        ti.Unlock()
    
        clone.Ascend(func(item btree.Item) bool {
            keyi := item.(*keyIndex)
            //Lock is needed here to prevent modification to the keyIndex while
            //compaction is going on or revision added to empty before deletion
            ti.Lock()
            keyi.compact(ti.lg, rev, available)
            if keyi.isEmpty() {
                item := ti.tree.Delete(keyi)
                if item == nil {
                    if ti.lg != nil {
                        ti.lg.Panic("failed to delete during compaction")
                    } else {
                        plog.Panic("store.index: unexpected delete failure during compaction")
                    }
                }
            }
            ti.Unlock()
            return true
        })
        return available
    }
    

    那么磁盘的 boltdb 数据什么时候删呢?其实是不删的,每次 CompactscheduleCompaction 调用 boltdb 的删除函数,但是我们知道 boltdb 只是标记而己不释放磁盘空间。当然最后还是有碎片整理的,也就是 Defrag, 看代码也好理解,就是新建 boltdb 底层文件,然后全量导数据,最后 rename 替换。

    相关文章

      网友评论

          本文标题:MVCC在etcd中的实现

          本文链接:https://www.haomeiwen.com/subject/mryffctx.html