美文网首页
etcd学习笔记(四):恢复

etcd学习笔记(四):恢复

作者: wangshanshi | 来源:发表于2021-12-07 15:35 被阅读0次

运维一个etcd集群的一个基本要求是能够故障恢复。etcd有哪些机制支持故障恢复?如何进行故障恢复?

持久化

example

etcd的持久化依赖于快照和WAL,常见的文件格式如下:


image.png image.png

其中wal文件的命名产生方式如下:

fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
其中:
enti是index of the last entry saved to the wal
seq()是最后一个wal文件的seq
func (w *WAL) seq() uint64 {
    t := w.tail()
    seq, _, err := parseWALName(filepath.Base(t.Name()))
    return seq
}

所以wal的seq是递增,index是存储的log的first index。
snapshot文件的命名产生方式如下:

    fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)

什么时候持久化

还是在ready数据结构的处理过程中进行持久化

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    // gofail: var raftBeforeSaveSnap struct{}
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                    // gofail: var raftAfterSaveSnap struct{}
                }

                // gofail: var raftBeforeSave struct{}
                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }

wal数据比较容易理解,那么快照数据是怎么产生的呢?
还在在apply函数中,会尝试trigger snapshot
这时候会调用raftStorage的create snapshot,然后此snapshot就可以出现在下次的ready结构中。

可以看出,snapshot里存储的是V2 store的内容。理论上来说,snapshot对于etcdv3来说,有用的信息仅仅是index和term。

func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
    clone := s.v2store.Clone()
    s.KV().Commit()

    s.GoAttach(func() {
        lg := s.Logger()

        d, err := clone.SaveNoCopy()
        snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
        if err = s.r.storage.SaveSnap(snap); err != nil {
            lg.Panic("failed to save snapshot", zap.Error(err))
        }
        if err = s.r.storage.Release(snap); err != nil {
            lg.Panic("failed to release wal", zap.Error(err))
        }

        // keep some in memory log entries for slow followers.
        compacti := uint64(1)
        if snapi > s.Cfg.SnapshotCatchUpEntries {
            compacti = snapi - s.Cfg.SnapshotCatchUpEntries
        }

        err = s.r.raftStorage.Compact(compacti)
    })
}

而对于V3来说,db文件本身就是快照。

snap 和 restore

snap和restore

在etcdctl/ctlv3/command文件夹下。

snapshot save

snapshot save命令会调用etcd-server的grpc接口,最终会调用到backend interface提供的snapshot接口。

    snap := ms.bg.Backend().Snapshot()
    pr, pw := io.Pipe()

    defer pr.Close()

    go func() {
        snap.WriteTo(pw)
        if err := snap.Close(); err != nil {
            ms.lg.Warn("failed to close snapshot", zap.Error(err))
        }
        pw.Close()
    }()
func (b *backend) Snapshot() Snapshot {
    b.batchTx.Commit()

    b.mu.RLock()
    defer b.mu.RUnlock()
    tx, err := b.db.Begin(false)
    if err != nil {
        b.lg.Fatal("failed to begin tx", zap.Error(err))
    }

    stopc, donec := make(chan struct{}), make(chan struct{})
    dbBytes := tx.Size()
    go func() {
        defer close(donec)
        // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
        // assuming a min tcp throughput of 100MB/s.
        var sendRateBytes int64 = 100 * 1024 * 1024
        warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
        if warningTimeout < minSnapshotWarningTimeout {
            warningTimeout = minSnapshotWarningTimeout
        }
        start := time.Now()
        ticker := time.NewTicker(warningTimeout)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
        

            case <-stopc:
            }
        }
    }()

    return &snapshot{tx, stopc, donec}
}

snapshot restore

构建membership

restore是etcdctl单独完成,不依赖于etcd-server,依赖参数:

- restoreCluster: InitialCluster,
- restoreClusterToken,
- restoreDataDir,
- restoreWalDir,
- restorePeerURLs,
- restoreName,
- skipHashCheck,

为什么需要restoreCluster、restoreClusterToken、restorePeerURLs、restoreName等参数?用来生成membership

ics, err = types.NewURLsMap(cfg.InitialCluster)
    srv := config.ServerConfig{
        Logger:              s.lg,
        Name:                cfg.Name,
        PeerURLs:            pURLs,
        InitialPeerURLsMap:  ics,
        InitialClusterToken: cfg.InitialClusterToken,
    }
    if err = srv.VerifyBootstrap(); err != nil {
        return err
    }
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)

其中

func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
    c := NewCluster(lg)
    for name, urls := range urlsmap {
        m := NewMember(name, urls, token, nil)
        if _, ok := c.members[m.ID]; ok {
            return nil, fmt.Errorf("member exists with identical ID %v", m)
        }
        if uint64(m.ID) == raft.None {
            return nil, fmt.Errorf("cannot use %x as member id", raft.None)
        }
        c.members[m.ID] = m
    }
    c.genID()
    return c, nil
}

根据每个etcd-server的name和url,以及token,生成member,可以看出memeberID=peerURL+clusterToken的hash

func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
    memberId := computeMemberId(peerURLs, clusterName, now)
    return newMember(name, peerURLs, memberId, false)
}
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
    peerURLstrs := peerURLs.StringSlice()
    sort.Strings(peerURLstrs)
    joinedPeerUrls := strings.Join(peerURLstrs, "")
    b := []byte(joinedPeerUrls)

    b = append(b, []byte(clusterName)...)
    if now != nil {
        b = append(b, []byte(fmt.Sprintf("%d", now.Unix()))...)
    }

    hash := sha1.Sum(b)
    return types.ID(binary.BigEndian.Uint64(hash[:8]))
}

clusterID是memberIDs的hash

func (c *RaftCluster) genID() {
    mIDs := c.MemberIDs()
    b := make([]byte, 8*len(mIDs))
    for i, id := range mIDs {
        binary.BigEndian.PutUint64(b[8*i:], uint64(id))
    }
    hash := sha1.Sum(b)
    c.cid = types.ID(binary.BigEndian.Uint64(hash[:8]))
}

恢复数据

如果没有传入wal和snap的恢复目录的话,就使用etcd的默认目录

dataDir := cfg.OutputDataDir
    if dataDir == "" {
        dataDir = cfg.Name + ".etcd"
    }
    if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) {
        return fmt.Errorf("data-dir %q not empty or could not be read", dataDir)
    }

    walDir := cfg.OutputWALDir
    if walDir == "" {
        walDir = filepath.Join(dataDir, "member", "wal")
    } else if fileutil.Exist(walDir) {
        return fmt.Errorf("wal-dir %q exists", walDir)
    }

    s.name = cfg.Name
    s.srcDbPath = cfg.SnapshotPath
    s.walDir = walDir
    s.snapDir = filepath.Join(dataDir, "member", "snap")
    s.skipHashCheck = cfg.SkipHashCheck

然后恢复DB数据,这里可以看出,在恢复DB数据之后,会移除DB中关于member的相关bucket数据

func (s *v3Manager) saveDB() error {
    err := s.copyAndVerifyDB()
    if err != nil {
        return err
    }

    be := backend.NewDefaultBackend(s.outDbPath())
    defer be.Close()

    err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend()
    if err != nil {
        return err
    }

    return nil
}

然后恢复snap和wal文件,这里有几个点:

  1. 会检查wal目录下,确保不存在任何的.wal文件
  2. 会将新的member关系存储到bolt-db
  3. 生成成员变更entry,提交到wal
  4. term=1,vote=peers[0].ID的hard state
  5. 根据V2 store恢复snapshot,并将snapshot 记录写入wal
  6. 快照存储metadata中的confstate,用于恢复peer关系
func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
    if err := fileutil.CreateDirAll(s.lg, s.walDir); err != nil {
        return nil, err
    }

    // add members again to persist them to the store we create.
    st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
    s.cl.SetStore(st)
    be := backend.NewDefaultBackend(s.outDbPath())
    defer be.Close()
    s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be))
    for _, m := range s.cl.Members() {
        s.cl.AddMember(m, true)
    }

    m := s.cl.MemberByName(s.name)
    md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
    metadata, merr := md.Marshal()
    if merr != nil {
        return nil, merr
    }
    w, walerr := wal.Create(s.lg, s.walDir, metadata)
    if walerr != nil {
        return nil, walerr
    }
    defer w.Close()

    peers := make([]raft.Peer, len(s.cl.MemberIDs()))
    for i, id := range s.cl.MemberIDs() {
        ctx, err := json.Marshal((*s.cl).Member(id))
        if err != nil {
            return nil, err
        }
        peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
    }

    ents := make([]raftpb.Entry, len(peers))
    nodeIDs := make([]uint64, len(peers))
    for i, p := range peers {
        nodeIDs[i] = p.ID
        cc := raftpb.ConfChange{
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  p.ID,
            Context: p.Context,
        }
        d, err := cc.Marshal()
        if err != nil {
            return nil, err
        }
        ents[i] = raftpb.Entry{
            Type:  raftpb.EntryConfChange,
            Term:  1,
            Index: uint64(i + 1),
            Data:  d,
        }
    }

    commit, term := uint64(len(ents)), uint64(1)
    hardState := raftpb.HardState{
        Term:   term,
        Vote:   peers[0].ID,
        Commit: commit,
    }
    if err := w.Save(hardState, ents); err != nil {
        return nil, err
    }

    b, berr := st.Save()
    if berr != nil {
        return nil, berr
    }
    confState := raftpb.ConfState{
        Voters: nodeIDs,
    }
    raftSnap := raftpb.Snapshot{
        Data: b,
        Metadata: raftpb.SnapshotMetadata{
            Index:     commit,
            Term:      term,
            ConfState: confState,
        },
    }
    sn := snap.New(s.lg, s.snapDir)
    if err := sn.SaveSnap(raftSnap); err != nil {
        return nil, err
    }
    snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}
    return &hardState, w.SaveSnapshot(snapshot)
}

最后根据commit和term恢复consistent index。

restart node

如果有wal则根据snapshot中的confstate恢复peer关系,否则的话根据启动参数生成peer关系哦

相关文章

网友评论

      本文标题:etcd学习笔记(四):恢复

      本文链接:https://www.haomeiwen.com/subject/wnknxrtx.html