美文网首页
etcd学习笔记4(草稿)

etcd学习笔记4(草稿)

作者: 酱油王0901 | 来源:发表于2020-08-05 00:05 被阅读0次

    ClientV3发送Put请求时,其携带的key,value数据被封装成一个Op,然后转化为一个pb.PutRequest以rpc形式被EtcdServer接收并处理。EtcdServer将序列化后的数据交由raft状态机进行处理。

    // file: clientv3/kv.go
    case tPut:
            var resp *pb.PutResponse
            r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, 
                        IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
            resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
            if err == nil {
                return OpResponse{put: (*PutResponse)(resp)}, nil
            }
    
    

    序列化后的data被封装成MsgProp类型的pb.Message,然后将其写入Node的propc通道。

    func (n *node) Propose(ctx context.Context, data []byte) error {
         return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
    }
    

    raft node接收到propc通道中的数据后,将其应用到状态机中。

    // file: raft/node.go
    select {
            // TODO: maybe buffer the config propose if there exists one (the way
            // described in raft dissertation)
            // Currently it is dropped in Step silently.
            case pm := <-propc:
                m := pm.m
                m.From = r.id
                err := r.Step(m)
                if pm.result != nil {
                    pm.result <- err
                    close(pm.result)
                }
    
    

    然后根据角色类型调用stepLeader, stepCandidatestepFollower方法。以Leader为例

    // file: raft/raft.go
     if !r.appendEntry(m.Entries...) {
          return ErrProposalDropped
    }
    r.bcastAppend()
    return nil
    

    raft首先将entries append到raftLog,更新每条entry的TermIndex,然后再将其广播到其他的peers。

    // file: raft/raft.go
    func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
        li := r.raftLog.lastIndex()
        for i := range es {
            es[i].Term = r.Term
            es[i].Index = li + 1 + uint64(i)
        }
        // Track the size of this uncommitted proposal.
        if !r.increaseUncommittedSize(es) {
            r.logger.Debugf(
                "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
                r.id,
            )
            // Drop the proposal.
            return false
        }
        // use latest "last" index after truncate/append
        li = r.raftLog.append(es...)
        r.prs.Progress[r.id].MaybeUpdate(li)
        // Regardless of maybeCommit's return, our caller will call bcastAppend.
        r.maybeCommit()
        return true
    }
    

    Leader发送Append Entries Message到peers。

    1. 首先判断progress有没有被暂停,如果被暂停则直接返回。
    2. 从raftLog中获取Term以及要发送的entries。如果获取失败一般是由于Log被compact导致的ErrCompacted,或者是ErrUnavailable
    3. 如果获取term或者entries失败则发送快照消息MsgSnap。同时将Progress状态重置为StateSnapshot, PendingSnapshot置为snapshot的index。
      m := pb.Message{}
      m.To = to
      m.Type = pb.MsgSnap
      m.Snapshot = snapshot  // 从raftLog中获取snapshot
      
    4. 如果成功获取term和entries,则将消息类型置为MsgApp
      m := pb.Message{}
      m.To = to
      m.Type = pb.MsgApp
      m.Index = pr.Next - 1  // TODO(zhengliang): pr.Next or pr.Next - 1 ?
      m.LogTerm = term
      m.Entries = ents
      m.Commit = r.raftLog.committed
      
    5. 如果要发送的entries大小不为0,则根据Progress的类型进行相应的处理。
    • 如果类型为tracker.StateReplicate
      • 首先获取entries的last index。
      • 乐观地更新Progress的Next index为last index +1
      • 将last index添加到Progress的inflights。Inflights是inflight messages的sliding window,Inflights能有效地限制inflight messages的数目,以及每个Progress能处理的带宽。当inflights满了时,消息将不能被发送。例如,当leader发送消息时,会将entries的last index加入到inflights中,同时index在Inflights中是按次序排列的。当leader接收到reply时,通过调用inflights.FreeLE来释放先前的inflights。
        Inflights
    • 如果类型为tracker.StateProbe,则只是简单将Progress的ProbeSent置为true。当ProbeSent为true时,则停止发送replication message到这个peer,知道ProbeSent被重置。
    // file: raft/raft.go
    // bcastAppend sends RPC, with entries to all peers that are not up-to-date
    // according to the progress recorded in r.prs.
    func (r *raft) bcastAppend() {
        r.prs.Visit(func(id uint64, _ *tracker.Progress) {
            if id == r.id {
                return
            }
            r.sendAppend(id)
        })
    }
    
    // sendAppend sends an append RPC with new entries (if any) and the
    // current commit index to the given peer.
    func (r *raft) sendAppend(to uint64) {
        r.maybeSendAppend(to, true)
    }
    
    // maybeSendAppend sends an append RPC with new entries to the given peer,
    // if necessary. Returns true if a message was sent. The sendIfEmpty
    // argument controls whether messages with no entries will be sent
    // ("empty" messages are useful to convey updated Commit indexes, but
    // are undesirable when we're sending multiple messages in a batch).
    func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
        pr := r.prs.Progress[to]
        if pr.IsPaused() {
            return false
        }
        m := pb.Message{}
        m.To = to
    
        term, errt := r.raftLog.term(pr.Next - 1)
        ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
        if len(ents) == 0 && !sendIfEmpty {
            return false
        }
    
        if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
            if !pr.RecentActive {
                r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
                return false
            }
    
            m.Type = pb.MsgSnap
            snapshot, err := r.raftLog.snapshot()
            if err != nil {
                if err == ErrSnapshotTemporarilyUnavailable {
                    r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                    return false
                }
                panic(err) // TODO(bdarnell)
            }
            if IsEmptySnap(snapshot) {
                panic("need non-empty snapshot")
            }
            m.Snapshot = snapshot
            sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
            r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
                r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
            pr.BecomeSnapshot(sindex)
            r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
        } else {
            m.Type = pb.MsgApp
            m.Index = pr.Next - 1 // TODO(zhengliang): pr.Next or pr.Next - 1 ?
            m.LogTerm = term
            m.Entries = ents
            m.Commit = r.raftLog.committed
            if n := len(m.Entries); n != 0 {
                switch pr.State {
                // optimistically increase the next when in StateReplicate
                case tracker.StateReplicate:
                    last := m.Entries[n-1].Index
                    pr.OptimisticUpdate(last)
                    pr.Inflights.Add(last)
                case tracker.StateProbe:
                    pr.ProbeSent = true
                default:
                    r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
                }
            }
        }
        r.send(m)
        return true
    }
    

    Message封装好之后,调用raft的send()方法发送出去。而raft的send()方法只是简单的做了一下检查,为Message加上Term。最后将其添加到raft的msgs队列。

    r.msgs = append(r.msgs, m)
    

    RawNode检测到有新的Ready处于pending状态,则populate成一个Ready对象。Ready是只读的,是对将要保存到stable storage,committed或者发送到peers的entries和messages的封装。然后将其写入readyc通道。

    // HasReady called when RawNode user need to check if any Ready pending.
    // Checking logic in this method should be consistent with Ready.containsUpdates().
    func (rn *RawNode) HasReady() bool {
        r := rn.raft
        if !r.softState().equal(rn.prevSoftSt) {
            return true
        }
        if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
            return true
        }
        if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
            return true
        }
        if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
            return true
        }
        if len(r.readStates) != 0 {
            return true
        }
        return false
    }
    
    // file: raft/node.go
    func (n *node) run() {
        var propc chan msgWithResult
        var readyc chan Ready
        var advancec chan struct{}
        var rd Ready
    
        r := n.rn.raft
    
        lead := None
    
        for {
            if advancec != nil {
                readyc = nil
            } else if n.rn.HasReady() {
                // Populate a Ready. Note that this Ready is not guaranteed to
                // actually be handled. We will arm readyc, but there's no guarantee
                // that we will actually send on it. It's possible that we will
                // service another channel instead, loop around, and then populate
                // the Ready again. We could instead force the previous Ready to be
                // handled first, but it's generally good to emit larger Readys plus
                // it simplifies testing (by emitting less frequently and more
                // predictably).
                rd = n.rn.readyWithoutAccept()
                readyc = n.readyc
            }
            .........
            case readyc <- rd:  // 将其写入readyc通道
                n.rn.acceptReady(rd)
                advancec = n.advancec
    

    etcdServer的raftNode从通道中接收到Ready数据后,将其中的Committed entries和snapshot封装成 apply后写入applyc通道,在写入前会更新committed index。接着执行如下步骤:

    1. 如果为Leader node,则会将messages发送到peers。而其中的raftpb.MsgSnap类型的message会写入msgSnapC通道。leader可以并发写磁盘以及复制到peers节点。
    2. 持久化snapshot。
    3. 持久化Hard State和entries。
    4. 强制WAL去fsync它的hard state。
    5. 等待applyAll执行完成。
    // file: etcdserver/raft.go
    // apply contains entries, snapshot to be applied. Once
    // an apply is consumed, the entries will be persisted to
    // to raft storage concurrently; the application must read
    // raftDone before assuming the raft messages are stable.
    type apply struct {
        entries  []raftpb.Entry
        snapshot raftpb.Snapshot
        // notifyc synchronizes etcd server applies with the raft node
        notifyc chan struct{}
    }
    
    // start prepares and starts raftNode in a new goroutine. It is no longer safe
    // to modify the fields after it has been started.
    func (r *raftNode) start(rh *raftReadyHandler) {
        internalTimeout := time.Second
    
        go func() {
            defer r.onStop()
            islead := false
    
            for {
                select {
                case <-r.ticker.C:
                    r.tick()
                case rd := <-r.Ready():
                    if rd.SoftState != nil {
                        newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
                        if newLeader {
                            leaderChanges.Inc()
                        }
    
                        if rd.SoftState.Lead == raft.None {
                            hasLeader.Set(0)
                        } else {
                            hasLeader.Set(1)
                        }
    
                        rh.updateLead(rd.SoftState.Lead)
                        islead = rd.RaftState == raft.StateLeader
                        if islead {
                            isLeader.Set(1)
                        } else {
                            isLeader.Set(0)
                        }
                        rh.updateLeadership(newLeader)
                        r.td.Reset()
                    }
    
                    if len(rd.ReadStates) != 0 {
                        select {
                        case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
                        case <-time.After(internalTimeout):
                            r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
                        case <-r.stopped:
                            return
                        }
                    }
                    notifyc := make(chan struct{}, 1)
                    ap := apply{
                        entries:  rd.CommittedEntries,
                        snapshot: rd.Snapshot,
                        notifyc:  notifyc,
                    }
    
                    updateCommittedIndex(&ap, rh)
    
                    select {
                    case r.applyc <- ap:
                    case <-r.stopped:
                        return
                    }
    
                    // the leader can write to its disk in parallel with replicating to the followers and them
                    // writing to their disks.
                    // For more details, check raft thesis 10.2.1
                    if islead {
                        // gofail: var raftBeforeLeaderSend struct{}
                        r.transport.Send(r.processMessages(rd.Messages))
                    }
    
                    // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                    // ensure that recovery after a snapshot restore is possible.
                    if !raft.IsEmptySnap(rd.Snapshot) {
                        // gofail: var raftBeforeSaveSnap struct{}
                        if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                            r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                        }
                        // gofail: var raftAfterSaveSnap struct{}
                    }
    
                    // gofail: var raftBeforeSave struct{}
                    if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                        r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                    }
                    if !raft.IsEmptyHardState(rd.HardState) {
                        proposalsCommitted.Set(float64(rd.HardState.Commit))
                    }
                    // gofail: var raftAfterSave struct{}
    
                    if !raft.IsEmptySnap(rd.Snapshot) {
                        // Force WAL to fsync its hard state before Release() releases
                        // old data from the WAL. Otherwise could get an error like:
                        // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                        // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                        if err := r.storage.Sync(); err != nil {
                            r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                        }
                        // etcdserver now claim the snapshot has been persisted onto the disk
                        notifyc <- struct{}{}
    
                        // gofail: var raftBeforeApplySnap struct{}
                        r.raftStorage.ApplySnapshot(rd.Snapshot)
                        r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                        // gofail: var raftAfterApplySnap struct{}
    
                        if err := r.storage.Release(rd.Snapshot); err != nil {
                            r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                        }
                        // gofail: var raftAfterWALRelease struct{}
                    }
    
                    r.raftStorage.Append(rd.Entries)
    
                    if !islead {
                        // finish processing incoming messages before we signal raftdone chan
                        msgs := r.processMessages(rd.Messages)
    
                        // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
                        notifyc <- struct{}{}
    
                        // Candidate or follower needs to wait for all pending configuration
                        // changes to be applied before sending messages.
                        // Otherwise we might incorrectly count votes (e.g. votes from removed members).
                        // Also slow machine's follower raft-layer could proceed to become the leader
                        // on its own single-node cluster, before apply-layer applies the config change.
                        // We simply wait for ALL pending entries to be applied for now.
                        // We might improve this later on if it causes unnecessary long blocking issues.
                        waitApply := false
                        for _, ent := range rd.CommittedEntries {
                            if ent.Type == raftpb.EntryConfChange {
                                waitApply = true
                                break
                            }
                        }
                        if waitApply {
                            // blocks until 'applyAll' calls 'applyWait.Trigger'
                            // to be in sync with scheduled config-change job
                            // (assume notifyc has cap of 1)
                            select {
                            case notifyc <- struct{}{}:
                            case <-r.stopped:
                                return
                            }
                        }
    
                        // gofail: var raftBeforeFollowerSend struct{}
                        r.transport.Send(msgs)
                    } else {
                        // leader already processed 'MsgSnap' and signaled
                        notifyc <- struct{}{}
                    }
    
                    r.Advance()
                case <-r.stopped:
                    return
                }
            }
        }()
    }
    

    etcdServer从applyc中接收到apply数据后会将其apply。

    func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
        s.applySnapshot(ep, apply)
        s.applyEntries(ep, apply)
    
        proposalsApplied.Set(float64(ep.appliedi))
        s.applyWait.Trigger(ep.appliedi)
    
        // wait for the raft routine to finish the disk writes before triggering a
        // snapshot. or applied index might be greater than the last index in raft
        // storage, since the raft routine might be slower than apply routine.
        <-apply.notifyc
    
        s.triggerSnapshot(ep)
        select {
        // snapshot requested via send()
        case m := <-s.r.msgSnapC:
            merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
            s.sendMergedSnap(merged)
        default:
        }
    }
    

    相关文章

      网友评论

          本文标题:etcd学习笔记4(草稿)

          本文链接:https://www.haomeiwen.com/subject/zuburktx.html