美文网首页
EtcdRaft源码分析(日志复制)

EtcdRaft源码分析(日志复制)

作者: Pillar_Zhong | 来源:发表于2019-03-14 14:39 被阅读0次
    1552532882423.png

    Leader

    假定现在已经选出leader,开始要准备给其他人做日志同步了。

    首先你要成为一个真正的Leader,需要做前期准备。

    • 从Candidate转变为Leader,不是只是换个名字而已
    • 转变完成后,就要给其他成员同步日志了

    becomeLeader

    func (r *raft) becomeLeader() {
       // TODO(xiangli) remove the panic when the raft implementation is stable
       if r.state == StateFollower {
          panic("invalid transition [follower -> leader]")
       }
       r.step = stepLeader
       r.reset(r.Term)
       r.tick = r.tickHeartbeat
       r.lead = r.id
       r.state = StateLeader
       // Followers enter replicate mode when they've been successfully probed
       // (perhaps after having received a snapshot as a result). The leader is
       // trivially in this state. Note that r.reset() has initialized this
       // progress with the last index already.
       r.prs[r.id].becomeReplicate()
    
       // Conservatively set the pendingConfIndex to the last index in the
       // log. There may or may not be a pending config change, but it's
       // safe to delay any future proposals until we commit all our
       // pending log entries, and scanning the entire tail of the log
       // could be expensive.
       r.pendingConfIndex = r.raftLog.lastIndex()
    
       emptyEnt := pb.Entry{Data: nil}
       if !r.appendEntry(emptyEnt) {
          // This won't happen because we just called reset() above.
          r.logger.Panic("empty entry was dropped")
       }
       // As a special case, don't count the initial empty entry towards the
       // uncommitted log quota. This is because we want to preserve the
       // behavior of allowing one entry larger than quota if the current
       // usage is zero.
       r.reduceUncommittedSize([]pb.Entry{emptyEnt})
       r.logger.Infof("%x became leader at term %d", r.id, r.Term)
    }
    
    • 首先之前的身份不能是follower
    • 之后step处理会让stepLeader托管
    • 将自己设为ProgressStateReplicate,且Next=Match+1

    reset(r.Term)

    func (r *raft) reset(term uint64) {
       if r.Term != term {
          r.Term = term
          r.Vote = None
       }
       r.lead = None
    
       r.electionElapsed = 0
       r.heartbeatElapsed = 0
       r.resetRandomizedElectionTimeout()
    
       r.abortLeaderTransfer()
    
       r.votes = make(map[uint64]bool)
       r.forEachProgress(func(id uint64, pr *Progress) {
          *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
          if id == r.id {
             pr.Match = r.raftLog.lastIndex()
          }
       })
    
       r.pendingConfIndex = 0
       r.uncommittedSize = 0
       r.readOnly = newReadOnly(r.readOnly.option)
    }
    
    • 设置任期为当前任期
    • 投票,lead,选举计时器,心跳计时器,随机选举超时时间,leader转移,投票机,pendingConfigIndex,未提交的entrySize,readOnly全部清零
      • pendingConfigIndex
      • readOnly
      • uncommittedSize
    • 重置本地保存的其他节点的进度
      • 这里需要注意的是,将对方的Next设为跟leader保持一致,是leader假定大家都跟我一致。r.raftLog.lastIndex() + 1
      • 每个节点的Progress的状态初始都为Probe

    tickHeartbeat

    r.heartbeatElapsed++
    r.electionElapsed++
    
    if r.electionElapsed >= r.electionTimeout {
       r.electionElapsed = 0
       if r.checkQuorum {
          r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
       }
       // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
       if r.state == StateLeader && r.leadTransferee != None {
          r.abortLeaderTransfer()
       }
    }
    
    if r.state != StateLeader {
       return
    }
    
    if r.heartbeatElapsed >= r.heartbeatTimeout {
       r.heartbeatElapsed = 0
       r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
    }
    
    • 如果成员接受Leader的同步请求的情况

    • 还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1

      • ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
      • ProgressStateReplicate
        • 到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。

    bcastAppend

    func (r *raft) maybeSendAppend (to uint64, sendIfEmpty bool) bool {
       pr := r.getProgress(to)
       if pr.IsPaused() {
          return false
       }
       m := pb.Message{}
       m.To = to
    
       term, errt := r.raftLog.term(pr.Next - 1)
       ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
       if len(ents) == 0 && !sendIfEmpty {
          return false
       }
    
       if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
          if !pr.RecentActive {
             r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
             return false
          }
    
          m.Type = pb.MsgSnap
          snapshot, err := r.raftLog.snapshot()
          if err != nil {
             if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return false
             }
             panic(err) // TODO(bdarnell)
          }
          if IsEmptySnap(snapshot) {
             panic("need non-empty snapshot")
          }
          m.Snapshot = snapshot
          sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
          r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
             r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
          pr.becomeSnapshot(sindex)
          r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
       } else {
          m.Type = pb.MsgApp
          m.Index = pr.Next - 1
          m.LogTerm = term
          m.Entries = ents
          m.Commit = r.raftLog.committed
          if n := len(m.Entries); n != 0 {
             switch pr.State {
             // optimistically increase the next when in ProgressStateReplicate
             case ProgressStateReplicate:
                last := m.Entries[n-1].Index
                pr.optimisticUpdate(last)
                pr.ins.add(last)
             case ProgressStateProbe:
                pr.pause()
             default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
             }
          }
       }
       r.send(m)
       return true
    }
    
    • 拿到对方的Progress,也就是进度。
    • 打包当前节点Next之后的entries
    • 打包当前节点Next-1的(任期,index),作为接收人校验用
    • 将自己committed的情况发给对方
    • 准备发MsgApp消息给对方
    • 遍历entries
      • 如果对方的状态是ProgressStateReplicate
        • 更新对方进度的Next为最新的last
        • 将last加到ins里面,注意这个ins是个类环形的队列。
    • Snapshot的情况
      • 如果Next-1的任期或之后的entries如果查不到,那肯定就在snapshot里面
      • 拿出当前节点存储的snapshot,有可能在unstable或storage里面
      • 将对方的Progress设为ProgressStateSnapshot,且设置PendingSnapshot为snapshot的index
      • 准备发MsgSnap消息给对方

    Follower

    case pb.MsgApp:
       r.electionElapsed = 0
       r.lead = m.From
       r.handleAppendEntries(m)
    
    • 首先Follower认为只有Leader才能发这种消息,所以只要收到就认他为Leader
    • 同时选举计时要清零
    • 真正处理的逻辑在handleAppendEntries里面

    handleAppendEntries

    func (r *raft) handleAppendEntries(m pb.Message) {
       if m.Index < r.raftLog.committed {
          r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
          return
       }
    
       if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
          r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
       } else {
          r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
       }
    }
    
    func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
        if l.matchTerm(index, logTerm) {
            lastnewi = index + uint64(len(ents))
            ci := l.findConflict(ents)
            switch {
            case ci == 0:
            case ci <= l.committed:
                l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
            default:
                offset := index + 1
                l.append(ents[ci-offset:]...)
            }
            l.commitTo(min(committed, lastnewi))
            return lastnewi, true
        }
        return 0, false
    }
    
    • 如果比Follower已经committed还要小,他会把自己committed的情况发回给Leader,没关系,将自己committed的情况发回给Leader

    • 在mayAppend的时候会去比较leader发来的index的(任期,index)是否一致。如果不一致给Leader报告你给的index位置的entry任期跟我对不上。有可能我根本都没有,有可能是完全不一样的东西。你的同步请求我拒绝,并附上我现在的最后一位。RejectHint: r.raftLog.lastIndex(),然后冲突点就是发来的index。

      • 报告中的最后一位的作用,待分析

      • Raft中只要某个位置的(任期,index)一致,那么index之前都是一致的。

    • 如果能对上,说明插入位置前一位我们都一致,这样可以放心往后append了。

      • 首先我们算出append之后新的最后一位,lastnewi
      • findConflict
        • 当然了,最好的情况是正好能接上,也就不存在冲突的可能性,无脑往后append新的entry就好了
        • 还有的情况是,follower本地存储的entry比leader想象的还要多,还要复杂。那怎么办,当然是从前往后找到第一个冲突点,然后之后的全部不要,跟leader保持一致。
      • 然后跟Leader要求的committed保持一致
      • 然后给Leader报告说,你要求的我都执行完了,附上我现在最新的last位置
    • 另外还有一种情况是,Leader的探测请求,Follower

    Leader

    下面我们剖析下Leader在收到成员的同步响应之后的处理。

    case pb.MsgAppResp:
       pr.RecentActive = true
    
       if m.Reject {
          r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
             r.id, m.RejectHint, m.From, m.Index)
          if pr.maybeDecrTo(m.Index, m.RejectHint) {
             r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
             if pr.State == ProgressStateReplicate {
                pr.becomeProbe()
             }
             r.sendAppend(m.From)
          }
       } else {
          oldPaused := pr.IsPaused()
          if pr.maybeUpdate(m.Index) {
             switch {
             case pr.State == ProgressStateProbe:
                pr.becomeReplicate()
             case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
                r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
                // Transition back to replicating state via probing state
                // (which takes the snapshot into account). If we didn't
                // move to replicating state, that would only happen with
                // the next round of appends (but there may not be a next
                // round for a while, exposing an inconsistent RaftStatus).
                pr.becomeProbe()
                pr.becomeReplicate()
             case pr.State == ProgressStateReplicate:
                pr.ins.freeTo(m.Index)
             }
    
             if r.maybeCommit() {
                r.bcastAppend()
             } else if oldPaused {
                // If we were paused before, this node may be missing the
                // latest commit index, so send it.
                r.sendAppend(m.From)
             }
             // We've updated flow control information above, which may
             // allow us to send multiple (size-limited) in-flight messages
             // at once (such as when transitioning from probe to
             // replicate, or when freeTo() covers multiple messages). If
             // we have more entries to send, send as many messages as we
             // can (without sending empty messages for the commit index)
             for r.maybeSendAppend(m.From, false) {
             }
             // Transfer leadership is in progress.
             if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
                r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                r.sendTimeoutNow(m.From)
             }
          }
       }
    

    agree

    • 如果成员接受Leader的同步请求的情况

    • 还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1

      • ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
      • ProgressStateReplicate 待分析
        • 到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。

    maybeUpdate

    func (pr *Progress) maybeUpdate(n uint64) bool {
       var updated bool
       if pr.Match < n {
          pr.Match = n
          updated = true
          pr.resume()
       }
       if pr.Next < n+1 {
          pr.Next = n + 1
       }
       return updated
    }
    
    • maybeUpdate,从上面分析就知道,没有拒绝就说明,大家在某种程度是一致的,对方发来的index就表示leader发给他的数据同步到哪里了。首先第一件事情,就是记录下来对方同步的进度。

    maybeCommit

    func (r *raft) maybeCommit() bool {
       // Preserving matchBuf across calls is an optimization
       // used to avoid allocating a new slice on each call.
       if cap(r.matchBuf) < len(r.prs) {
          r.matchBuf = make(uint64Slice, len(r.prs))
       }
       mis := r.matchBuf[:len(r.prs)]
       idx := 0
       for _, p := range r.prs {
          mis[idx] = p.Match
          idx++
       }
       sort.Sort(mis)
       mci := mis[len(mis)-r.quorum()]
       return r.raftLog.maybeCommit(mci, r.Term)
    }
    
    • maybeCommit, 这里会统计各个成员的进度,如果超过一半的人的同步进度Match已经超过了Leader的committed位置,这个时候Leader才可以安心去commit本地entry了。
    • 最后将commit的变更再次发给成员去同步

    reject

    如果被对方拒绝

    if m.Reject {
       r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
          r.id, m.RejectHint, m.From, m.Index)
       if pr.maybeDecrTo(m.Index, m.RejectHint) {
          r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
          if pr.State == ProgressStateReplicate {
             pr.becomeProbe()
          }
          r.sendAppend(m.From)
       }
    }
    

    maybeDecrTo

    func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
       if pr.State == ProgressStateReplicate {
          // the rejection must be stale if the progress has matched and "rejected"
          // is smaller than "match".
          if rejected <= pr.Match {
             return false
          }
          // directly decrease next to match + 1
          pr.Next = pr.Match + 1
          return true
       }
    
       // the rejection must be stale if "rejected" does not match next - 1
       if pr.Next-1 != rejected {
          return false
       }
    
       if pr.Next = min(rejected, last+1); pr.Next < 1 {
          pr.Next = 1
       }
       pr.resume()
       return true
    }
    
    • 如果对方进度的状态是ProgressStateReplicate,如果冲突点居然比Match要小,感觉不可思议,直接忽略。
      • 否则的话,直接跳到Match+1的地方作为进度的Next,相当于Match之后的全部丢掉,准备重新开始同步。简单直接粗暴。
    • 一般来说pr.Next-1是应该等于rejected的,想想看rejected是插入位置的前一位,专门用来校验用的,而pr.Next-1不也是插入位置的前一位么?所以如果不相等,感觉不可思议,直接忽略。
    • 将对方进度的Next回退到rejectted,其实就相当于Next回退一位,为什么这么做,其实就是在探测啦,回退一位,发给Follower看看是不是还是冲突,不行,回来,再回退一位,如此往复。总会找到相同的时候
    • 如果maybeDecrTo能够成功回退,但还不确定回退的位置,对方能接受,这个时候如果对方是ProgressStateReplicate状态,那么先转为ProgressStateProbe。
    • 好了,该回退的也回退了,将最新的entries按回退的位置再发给对方看看。
    • 可以看到ProgressStateReplicate会直接回退到Match+1, 去试试看,如果还被拒绝,那么会转成ProgressStateProbe,而ProgressStateProbe只会每次回退一位,去试试看。

    总结

    到这里,整个流程还可以往下继续在Leader和Follower之间来回往复,但是,大体的逻辑就是这样,可以说算法非常精妙。希望你能看懂我在说什么。

    相关文章

      网友评论

          本文标题:EtcdRaft源码分析(日志复制)

          本文链接:https://www.haomeiwen.com/subject/zzgqmqtx.html