美文网首页
etcd学习笔记(三): Propose

etcd学习笔记(三): Propose

作者: wangshanshi | 来源:发表于2021-12-06 18:19 被阅读0次

    我们已经在raft-example看过了对Propose的简单处理了,但是真正的etcd对Propose的处理更加复杂。主要是有如下几个点:

    1. consistent index。用于处理boltdb和raftlog之间的幂等性。
    2. 同步返回。由于raft的log复制是异步的,如何做到同步返回结果。

    当blotdb用作状态机的时候,wal和blotdb作为两个不同的实体,很有可能存在不一致的情况。所以etcd在blotdb中存储一条记录consistent-index,来代表已经apply到blot-db上成功的log index,这样当根据wal恢复blot-db的时候,就可以判断log index是不是已经被apply过。

    处理过程

    在etcd-server中,一条propose的处理过程:
    首先是为每一条请求注册一个唯一requestID,然后register并等待requestID处理完成

    func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
    
        ai := s.getAppliedIndex()
        ci := s.getCommittedIndex()
        if ci > ai+maxGapBetweenApplyAndCommitIndex {
            return nil, ErrTooManyRequests
        }
    
        r.Header = &pb.RequestHeader{
            ID: s.reqIDGen.Next(),
        }
    
        id := r.ID
        if id == 0 {
            id = r.Header.ID
        }
        ch := s.w.Register(id)
    
        cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
        defer cancel()
    
        start := time.Now()
        err = s.r.Propose(cctx, data)
    
    
        select {
        case x := <-ch:
            return x.(*applyResult), nil
        case <-cctx.Done():
            proposalsFailed.Inc()
            s.w.Trigger(id, nil) // GC wait
            return nil, s.parseProposeCtxErr(cctx.Err(), start)
        case <-s.done:
            return nil, ErrStopped
        }
    }
    

    请求转发到raft-node处理

    根据之前的知识,raft-node经过一番处理之后,交给上层的ready结构来处理,首先msg转发到leader,然后leader调用processMsg,这里主要的操作是copy log到follower。

    
                    ap := apply{
                        entries:  rd.CommittedEntries,
                        snapshot: rd.Snapshot,
                        notifyc:  notifyc,
                    }
    
                    updateCommittedIndex(&ap, rh)
    
                    select {
                    case r.applyc <- ap:
                    case <-r.stopped:
                        return
                    }
    
                    if islead {
                        r.transport.Send(r.processMessages(rd.Messages))
                    }
    
                    // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                    // ensure that recovery after a snapshot restore is possible.
                    if !raft.IsEmptySnap(rd.Snapshot) {
                        if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                            r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                        }
                    }
    
                    if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                        r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                    }
                    if !raft.IsEmptyHardState(rd.HardState) {
                        proposalsCommitted.Set(float64(rd.HardState.Commit))
                    }
    
                    if !raft.IsEmptySnap(rd.Snapshot) {
                        // Force WAL to fsync its hard state before Release() releases
                        // old data from the WAL. Otherwise could get an error like:
                        // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                        // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                        if err := r.storage.Sync(); err != nil {
                            r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                        }
    
                        // etcdserver now claim the snapshot has been persisted onto the disk
                        notifyc <- struct{}{}
    
                        // gofail: var raftBeforeApplySnap struct{}
                        r.raftStorage.ApplySnapshot(rd.Snapshot)
                        r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                        // gofail: var raftAfterApplySnap struct{}
    
                        if err := r.storage.Release(rd.Snapshot); err != nil {
                            r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                        }
                        // gofail: var raftAfterWALRelease struct{}
                    }
    
                    r.raftStorage.Append(rd.Entries)
    
                    if !islead {
                        msgs := r.processMessages(rd.Messages)
                        notifyc <- struct{}{}
                        r.transport.Send(msgs)
                    } else {
                        // leader already processed 'MsgSnap' and signaled
                        notifyc <- struct{}{}
                    }
    
                    r.Advance()
    

    最终调用的是apply

            case ap := <-s.r.apply():
                f := func(context.Context) { s.applyAll(&ep, &ap) }
                sched.Schedule(f)
    
    
    func (s *EtcdServer) apply(
        es []raftpb.Entry,
        confState *raftpb.ConfState,
    ) (appliedt uint64, appliedi uint64, shouldStop bool) {
        s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
        for i := range es {
            e := es[i]
    
            switch e.Type {
            case raftpb.EntryNormal:
                s.applyEntryNormal(&e)
                s.setAppliedIndex(e.Index)
                s.setTerm(e.Term)
    
            case raftpb.EntryConfChange:
                // We need to apply all WAL entries on top of v2store
                // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
                shouldApplyV3 := membership.ApplyV2storeOnly
    
                // set the consistent index of current executing entry
                if e.Index > s.consistIndex.ConsistentIndex() {
                    s.consistIndex.SetConsistentIndex(e.Index, e.Term)
                    shouldApplyV3 = membership.ApplyBoth
                }
    
                var cc raftpb.ConfChange
                pbutil.MustUnmarshal(&cc, e.Data)
                removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
                s.setAppliedIndex(e.Index)
                s.setTerm(e.Term)
                shouldStop = shouldStop || removedSelf
                s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
    
            }
            appliedi, appliedt = e.Index, e.Term
        }
        return appliedt, appliedi, shouldStop
    }
    

    相关文章

      网友评论

          本文标题:etcd学习笔记(三): Propose

          本文链接:https://www.haomeiwen.com/subject/abexxrtx.html