Leader
假定现在已经选出leader,开始要准备给其他人做日志同步了。
首先你要成为一个真正的Leader,需要做前期准备。
- 从Candidate转变为Leader,不是只是换个名字而已
- 转变完成后,就要给其他成员同步日志了
becomeLeader
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs[r.id].becomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
- 首先之前的身份不能是follower
- 之后step处理会让stepLeader托管
- 将自己设为ProgressStateReplicate,且Next=Match+1
reset(r.Term)
func (r *raft) reset(term uint64) {
if r.Term != term {
r.Term = term
r.Vote = None
}
r.lead = None
r.electionElapsed = 0
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout()
r.abortLeaderTransfer()
r.votes = make(map[uint64]bool)
r.forEachProgress(func(id uint64, pr *Progress) {
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
})
r.pendingConfIndex = 0
r.uncommittedSize = 0
r.readOnly = newReadOnly(r.readOnly.option)
}
- 设置任期为当前任期
- 投票,lead,选举计时器,心跳计时器,随机选举超时时间,leader转移,投票机,pendingConfigIndex,未提交的entrySize,readOnly全部清零
- pendingConfigIndex
- readOnly
- uncommittedSize
- 重置本地保存的其他节点的进度
- 这里需要注意的是,将对方的Next设为跟leader保持一致,是leader假定大家都跟我一致。r.raftLog.lastIndex() + 1
- 每个节点的Progress的状态初始都为Probe
tickHeartbeat
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
if r.state != StateLeader {
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
如果成员接受Leader的同步请求的情况
还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1
- ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
- ProgressStateReplicate
- 到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。
bcastAppend
func (r *raft) maybeSendAppend (to uint64, sendIfEmpty bool) bool {
pr := r.getProgress(to)
if pr.IsPaused() {
return false
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
return true
}
- 拿到对方的Progress,也就是进度。
- 打包当前节点Next之后的entries
- 打包当前节点Next-1的(任期,index),作为接收人校验用
- 将自己committed的情况发给对方
- 准备发MsgApp消息给对方
- 遍历entries
- 如果对方的状态是ProgressStateReplicate
- 更新对方进度的Next为最新的last
- 将last加到ins里面,注意这个ins是个类环形的队列。
- Snapshot的情况
- 如果Next-1的任期或之后的entries如果查不到,那肯定就在snapshot里面
- 拿出当前节点存储的snapshot,有可能在unstable或storage里面
- 将对方的Progress设为ProgressStateSnapshot,且设置PendingSnapshot为snapshot的index
- 准备发MsgSnap消息给对方
Follower
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
- 首先Follower认为只有Leader才能发这种消息,所以只要收到就认他为Leader
- 同时选举计时要清零
- 真正处理的逻辑在handleAppendEntries里面
handleAppendEntries
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
如果比Follower已经committed还要小,他会把自己committed的情况发回给Leader,没关系,将自己committed的情况发回给Leader
在mayAppend的时候会去比较leader发来的index的(任期,index)是否一致。如果不一致给Leader报告你给的index位置的entry任期跟我对不上。有可能我根本都没有,有可能是完全不一样的东西。你的同步请求我拒绝,并附上我现在的最后一位。RejectHint: r.raftLog.lastIndex(),然后冲突点就是发来的index。
报告中的最后一位的作用,待分析
Raft中只要某个位置的(任期,index)一致,那么index之前都是一致的。
如果能对上,说明插入位置前一位我们都一致,这样可以放心往后append了。
- 首先我们算出append之后新的最后一位,lastnewi
- findConflict
- 当然了,最好的情况是正好能接上,也就不存在冲突的可能性,无脑往后append新的entry就好了
- 还有的情况是,follower本地存储的entry比leader想象的还要多,还要复杂。那怎么办,当然是从前往后找到第一个冲突点,然后之后的全部不要,跟leader保持一致。
- 然后跟Leader要求的committed保持一致
- 然后给Leader报告说,你要求的我都执行完了,附上我现在最新的last位置
另外还有一种情况是,Leader的探测请求,Follower
Leader
下面我们剖析下Leader在收到成员的同步响应之后的处理。
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.becomeProbe()
pr.becomeReplicate()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) {
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
agree
如果成员接受Leader的同步请求的情况
还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1
- ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
- ProgressStateReplicate 待分析
- 到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。
maybeUpdate
func (pr *Progress) maybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.resume()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
- maybeUpdate,从上面分析就知道,没有拒绝就说明,大家在某种程度是一致的,对方发来的index就表示leader发给他的数据同步到哪里了。首先第一件事情,就是记录下来对方同步的进度。
maybeCommit
func (r *raft) maybeCommit() bool {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(r.matchBuf) < len(r.prs) {
r.matchBuf = make(uint64Slice, len(r.prs))
}
mis := r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
sort.Sort(mis)
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term)
}
- maybeCommit, 这里会统计各个成员的进度,如果超过一半的人的同步进度Match已经超过了Leader的committed位置,这个时候Leader才可以安心去commit本地entry了。
- 最后将commit的变更再次发给成员去同步
reject
如果被对方拒绝
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
}
maybeDecrTo
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.State == ProgressStateReplicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// directly decrease next to match + 1
pr.Next = pr.Match + 1
return true
}
// the rejection must be stale if "rejected" does not match next - 1
if pr.Next-1 != rejected {
return false
}
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}
- 如果对方进度的状态是ProgressStateReplicate,如果冲突点居然比Match要小,感觉不可思议,直接忽略。
- 否则的话,直接跳到Match+1的地方作为进度的Next,相当于Match之后的全部丢掉,准备重新开始同步。简单直接粗暴。
- 一般来说pr.Next-1是应该等于rejected的,想想看rejected是插入位置的前一位,专门用来校验用的,而pr.Next-1不也是插入位置的前一位么?所以如果不相等,感觉不可思议,直接忽略。
- 将对方进度的Next回退到rejectted,其实就相当于Next回退一位,为什么这么做,其实就是在探测啦,回退一位,发给Follower看看是不是还是冲突,不行,回来,再回退一位,如此往复。总会找到相同的时候
- 如果maybeDecrTo能够成功回退,但还不确定回退的位置,对方能接受,这个时候如果对方是ProgressStateReplicate状态,那么先转为ProgressStateProbe。
- 好了,该回退的也回退了,将最新的entries按回退的位置再发给对方看看。
- 可以看到ProgressStateReplicate会直接回退到Match+1, 去试试看,如果还被拒绝,那么会转成ProgressStateProbe,而ProgressStateProbe只会每次回退一位,去试试看。
总结
到这里,整个流程还可以往下继续在Leader和Follower之间来回往复,但是,大体的逻辑就是这样,可以说算法非常精妙。希望你能看懂我在说什么。
网友评论