To be continued...
状态转换图
Leader竞选
Follower节点或者Candidate节点在选举超时时间(例如,leader离线)到了之后会发起新的选举。
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
发送的消息为:
m := pb.Message{}
m.From = r.id
m.Type = pb.MsgHup
raft状态机的Step方法在处理Message时会根据是否设置perVote
,从而发起PreElection
或Election
。
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
....
这时,raft状态机会判断当前是否为leader,是则直接忽略此message。如果不是leader,则判断是否能竞选leader。如果满足以下几个条件之一就不能竞选leader。
- 此raft的Progress为空。
- Learner
- 其raftLog有pending的snapshot,且不为空。
- 获取还没有applied的log entries,如果其中包含
pb.EntryConfChangeV2
或者pb.EntryConfChange
。
如果上述列举的都不满足则可以竞选leader。
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
根据CampaignType
为campaignPreElection
或campaignElection
,当前节点状态机进入becomePreCandidate
或者becomeCandidate
。
- 如果是进入
Candidate
状态:
- 当前状态机的
step
函数指向stepCandidate
- 将状态机重置,主要包括以下几个方面:
- 将状态机term值加1,Vote和lead置为空。
- 重置选举,心跳计时器,随机选举超时计时器。
- 中止Leader转移过程。
- 后面重置投票信息以及进度信息还不是太清楚(TODO)
- 将
pendingConfIndex
和uncommittedSize
置为0(TODO) - 设置
readOnly
(TODO)
- 将
tick
函数指向tickElection
。 - 投票给自己,即
r.Vote = r.id
。 - 将状态置为
StateCandidate
,进入candidate状态。
- 如果是进入
PreCandidate
状态:
- 当前状态机的
step
函数指向stepCandidate
- 重置投票信息
- 将
tick
函数指向tickElection
。 - lead置为空。
- 将状态置为
StatePreCandidate
,进入PreCandidate状态。
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
if !r.promotable() {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
// 只是增加了消息的term值,并未增加raft的term值。
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
// 单节点集群检测
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
// 获取集群中能投票节点ID,并发送指定类型的消息
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
// Index和LogTerm为raft log中的最后一条entry的Index值和Term值
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
向集群中的节点发送指定类型的消息。
m := pb.Message{}
m.Term= r.Term + 1 // term+1还没有应用到状态机
m.To = id
m.Type = pb.MsgPreVote
m.Index = r.raftLog.lastIndex()
m.LogTerm = r.raftLog.lastTerm()
m.Context = ctx
// ----------------------
m := pb.Message{}
m.Term= r.Term // 需要注意的是,term值其实已经加1了,因为已经应用在状态机上了。
m.To = id
m.Type = pb.MsgVote
m.Index = r.raftLog.lastIndex()
m.LogTerm = r.raftLog.lastTerm()
m.Context = ctx
接着调用raft.send()
将消息追加到raft.msg
队列中,等待上层模块将其发送出去。
streamReader
接收到消息之后进行反序列化处理,然后根据消息类型发送到streamReader
的recvc
通道或propc
通道。实际上写入的是peer
的recvc
通道或propc
通道,不过在这里是直接写入recvc
通道。
// file: etcdserver/api/rafthttp/stream.go
for {
m, err := dec.decode()
if err != nil {
cr.mu.Lock()
cr.close()
cr.mu.Unlock()
return err
}
// gofail-go: var raftDropHeartbeat struct{}
// continue labelRaftDropHeartbeat
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
cr.mu.Lock()
paused := cr.paused
cr.mu.Unlock()
if paused {
continue
}
if isLinkHeartbeatMessage(&m) {
// raft is not interested in link layer
// heartbeat message, so we should ignore
// it.
continue
}
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
default:
peer接收到message之后,会调用raft.Process()
方法进行处理。
// file: etcdserver/api/rafthttp/peer.go
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
if t.Logger != nil {
t.Logger.Warn("failed to process Raft message", zap.Error(err))
}
}
case <-p.stopc:
return
}
}
}()
// r.Process might block for processing proposal when there is no leader.
// Thus propc must be put into a separate routine with recvc to avoid blocking
// processing other raft messages.
go func() {
for {
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
if t.Logger != nil {
t.Logger.Warn("failed to process Raft message", zap.Error(err))
}
}
case <-p.stopc:
return
}
}
}()
raft.Process()
方法首先检查发送消息的节点是否被移除,如果没有被移除则调用raft.Step()
方法进行处理。
// file: etcdserver/server.go
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
lg := s.getLogger()
if s.cluster.IsIDRemoved(types.ID(m.From)) {
lg.Warn(
"rejected Raft message from removed member",
zap.String("local-member-id", s.ID().String()),
zap.String("removed-member-id", types.ID(m.From).String()),
)
return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
}
if m.Type == raftpb.MsgApp {
s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
}
return s.r.Step(ctx, m)
}
接着调用node.step()
方法。其会将Message写入node的recvc
通道。
func (n *node) step(ctx context.Context, m pb.Message) error {
return n.stepWithWaitOption(ctx, m, false)
}
recvc
通道接收到消息之后会将其应用在状态机中。
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
当收到消息的Term大于当前Term,且消息类型为MsgPreVote
或MsgVote
,会先根据message中是否带有campaignTransfer
的context,以此决定是否强制当前节点参与选举。接着会判断是否开启CheckQuorum模式,当前节点是否有已知的的lead节点,以及选取计时器是否超时。如果不是强制要求此节点参与选举过程,同时节点在lease内,则直接返回。
同时需要注意的是,由于当前节点的term值小于消息的term值,因此会根据消息的类型做相应的处理。
- 如果消息类型为
MsgPreVote
,则当前节点的状态机不做处理。 - 如果消息类型为
MsgVote
,则当前节点becomeFollower
,并且用消息的term值重置状态机。但是当前节点的lead为空。 - 如果是leader发送过来的
MsgApp
,MsgHearbeat
以及MsgASnap
消息,则用消息的term值重置状态机,并将lead置为发送消息的节点。
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
当前节点在参与选举时,会考虑如下情况来决定是否参与投票。只有满足下面1,2,3中的一个,且同时满足条件4才能将票投给发送消息的节点。
- 当前节点是否已经投过票,而且投的正是发送消息的节点。
- 当前节点还没有投票,而且lead为空。
- 消息的type为PreVote,而且消息的Term值大于当前节点状态机的Term值。
- 发送消息的节点的raftLog是否包含当前节点的所有entries信息。
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
此时,会发送一条响应信息。如果消息类型为pb.MsgVote,则会将选取超时重置,并将票投给消息发送节点。
m := pb.Message{}
m.Term= m.Term
m.To = m.From
m.Type = pb.MsgPreVoteResp/pb.MsgVoteResp
但是,如果是拒绝投票的话,响应消息为:
m := pb.Message{}
m.Term= m.Term
m.To = m.From
m.Type = pb.MsgPreVoteResp/pb.MsgVoteResp
m.Reject = true // 拒绝投票
发送消息的节点,即PreCandidate/Candidate节点收到集群中其他节点返回的MsgPreVoteResp
或者MsgVoteResp
消息,也会调用Step方法进行处理。由于响应消息的term值与节点的Term相等(如果是PreVote,则term都没有加1;如果是Vote,则term值都已经加1),因此会直接调用stepCandidate
进行消息处理。节点根据响应消息进行投票统计,然后根据投票结果以及自身状态机的状态来进行相应的处理。
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
// 统计投票结果
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
// 如果统计投票结果获得大多数的投票
case quorum.VoteWon:
if r.state == StatePreCandidate {
// 如果状态机的状态是preCandidate,则开始竞选leader
r.campaign(campaignElection)
} else {
// 如果状态机的状态是Candidate则直接成为leader并向集群中其他节点广播MsgApp消息
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
如果赢得选举投票,则根据当前状态则决定下一步如何处理。
(1) 如果状态为PreCandidate,则开始竞选leader,调用campaign(campaignElection)
。
(2) 如果状态为其他,即Candidate,则竞选leader成功,并向集群中其他节点广播MsgApp消息。
- followers的Progress信息进入Replicate状态。
- leader会append一条空的entry记录。
如果选举失败,则进入becomeFollower
。
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.Progress[r.id].BecomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
同步Log
Leader节点会向其他节点发送同步log的消息。
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}
在向peers发送Append信息时,首先会判断Progress是否暂停,leader节点有所有peers的Progress信息,并根据进度的信息以及状态来判断是否暂停。被跟踪的follower节点有三种状态类型:
// StateType is the state of a tracked follower.
type StateType uint64
const (
// StateProbe indicates a follower whose last index isn't known. Such a
// follower is "probed" (i.e. an append sent periodically) to narrow down
// its last index. In the ideal (and common) case, only one round of probing
// is necessary as the follower will react with a hint. Followers that are
// probed over extended periods of time are often offline.
StateProbe StateType = iota
// StateReplicate is the state steady in which a follower eagerly receives
// log entries to append to its log.
StateReplicate
// StateSnapshot indicates a follower that needs log entries not available
// from the leader's Raft log. Such a follower needs a full snapshot to
// return to StateReplicate.
StateSnapshot
)
如果没有被暂停,则尝试去获取需要发送到peer的term和entries,如果获取失败,则说明需要发送快照信息pb.MsgSnap
,因为有可能entries已经被压缩,因此会尝试去发送快照信息,同时Progress进入StateSnapshot
状态。如果成功获取到term和entries,则发送pb.MsgApp
信息。
m := pb.Message{}
m.To = to
m.Type = pb.MsgSnap
m.Snapshot = snapshot
或者
m := pb.Message{}
m.To = to
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
return true
}
问题点
- 在统计计票赢得leader之后会去调用
r.becomeLeader
和r.bcastAppend
,如果超过quorum个节点回复响应消息,那是否会被调用多次? - Leader节点append一条空的entry记录的作用是啥?
网友评论