美文网首页
etcd leader选举过程(草稿)

etcd leader选举过程(草稿)

作者: 酱油王0901 | 来源:发表于2020-08-05 00:21 被阅读0次

    To be continued...


    状态转换图

    Leader竞选

    Follower节点或者Candidate节点在选举超时时间(例如,leader离线)到了之后会发起新的选举。

    // tickElection is run by followers and candidates after r.electionTimeout.
    func (r *raft) tickElection() {
        r.electionElapsed++
    
        if r.promotable() && r.pastElectionTimeout() {
            r.electionElapsed = 0
            r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
        }
    }
    

    发送的消息为:

    m := pb.Message{}
    m.From = r.id
    m.Type = pb.MsgHup
    

    raft状态机的Step方法在处理Message时会根据是否设置perVote,从而发起PreElectionElection

    switch m.Type {
        case pb.MsgHup:
            if r.preVote {
                r.hup(campaignPreElection)
            } else {
                r.hup(campaignElection)
            }
            ....
    

    这时,raft状态机会判断当前是否为leader,是则直接忽略此message。如果不是leader,则判断是否能竞选leader。如果满足以下几个条件之一就不能竞选leader。

    • 此raft的Progress为空。
    • Learner
    • 其raftLog有pending的snapshot,且不为空。
    • 获取还没有applied的log entries,如果其中包含pb.EntryConfChangeV2或者pb.EntryConfChange

    如果上述列举的都不满足则可以竞选leader。

    func (r *raft) hup(t CampaignType) {
        if r.state == StateLeader {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
            return
        }
    
        if !r.promotable() {
            r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
            return
        }
        ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
        if err != nil {
            r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
        }
        if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
            r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
            return
        }
    
        r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
        r.campaign(t)
    }
    

    根据CampaignTypecampaignPreElectioncampaignElection,当前节点状态机进入becomePreCandidate或者becomeCandidate

    1. 如果是进入Candidate状态:
    • 当前状态机的step函数指向stepCandidate
    • 将状态机重置,主要包括以下几个方面:
      • 将状态机term值加1,Vote和lead置为空。
      • 重置选举,心跳计时器,随机选举超时计时器。
      • 中止Leader转移过程。
      • 后面重置投票信息以及进度信息还不是太清楚(TODO
      • pendingConfIndexuncommittedSize置为0(TODO
      • 设置readOnlyTODO
    • tick函数指向tickElection
    • 投票给自己,即r.Vote = r.id
    • 将状态置为StateCandidate,进入candidate状态。
    1. 如果是进入PreCandidate状态:
    • 当前状态机的step函数指向stepCandidate
    • 重置投票信息
    • tick函数指向tickElection
    • lead置为空。
    • 将状态置为StatePreCandidate,进入PreCandidate状态。
    // campaign transitions the raft instance to candidate state. This must only be
    // called after verifying that this is a legitimate transition.
    func (r *raft) campaign(t CampaignType) {
        if !r.promotable() {
            // This path should not be hit (callers are supposed to check), but
            // better safe than sorry.
            r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
        }
        var term uint64
        var voteMsg pb.MessageType
        if t == campaignPreElection {
            r.becomePreCandidate()
            voteMsg = pb.MsgPreVote
            // PreVote RPCs are sent for the next term before we've incremented r.Term.
            // 只是增加了消息的term值,并未增加raft的term值。
            term = r.Term + 1
        } else {
            r.becomeCandidate()
            voteMsg = pb.MsgVote
            term = r.Term
        }
        // 单节点集群检测
        if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster). Advance to the next state.
            if t == campaignPreElection {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader()
            }
            return
        }
        var ids []uint64
        {
            idMap := r.prs.Voters.IDs()
            ids = make([]uint64, 0, len(idMap))
            for id := range idMap {
                ids = append(ids, id)
            }
            sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
        }
        // 获取集群中能投票节点ID,并发送指定类型的消息
        for _, id := range ids {
            if id == r.id {
                continue
            }
            r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
    
            var ctx []byte
            if t == campaignTransfer {
                ctx = []byte(t)
            }
            // Index和LogTerm为raft log中的最后一条entry的Index值和Term值
            r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
        }
    }
    

    向集群中的节点发送指定类型的消息。

    m := pb.Message{}
    m.Term= r.Term + 1 // term+1还没有应用到状态机
    m.To = id
    m.Type = pb.MsgPreVote
    m.Index  = r.raftLog.lastIndex()
    m.LogTerm = r.raftLog.lastTerm()
    m.Context = ctx
    // ----------------------
    m := pb.Message{}
    m.Term= r.Term // 需要注意的是,term值其实已经加1了,因为已经应用在状态机上了。
    m.To = id
    m.Type = pb.MsgVote
    m.Index  = r.raftLog.lastIndex()
    m.LogTerm = r.raftLog.lastTerm()
    m.Context = ctx
    

    接着调用raft.send()将消息追加到raft.msg队列中,等待上层模块将其发送出去。
    streamReader接收到消息之后进行反序列化处理,然后根据消息类型发送到streamReaderrecvc通道或propc通道。实际上写入的是peerrecvc通道或propc通道,不过在这里是直接写入recvc通道。

    // file: etcdserver/api/rafthttp/stream.go
    for {
            m, err := dec.decode()
            if err != nil {
                cr.mu.Lock()
                cr.close()
                cr.mu.Unlock()
                return err
            }
    
            // gofail-go: var raftDropHeartbeat struct{}
            // continue labelRaftDropHeartbeat
            receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
    
            cr.mu.Lock()
            paused := cr.paused
            cr.mu.Unlock()
    
            if paused {
                continue
            }
    
            if isLinkHeartbeatMessage(&m) {
                // raft is not interested in link layer
                // heartbeat message, so we should ignore
                // it.
                continue
            }
    
            recvc := cr.recvc
            if m.Type == raftpb.MsgProp {
                recvc = cr.propc
            }
    
            select {
            case recvc <- m:
            default:
    

    peer接收到message之后,会调用raft.Process()方法进行处理。

    // file: etcdserver/api/rafthttp/peer.go
    go func() {
            for {
                select {
                case mm := <-p.recvc:
                    if err := r.Process(ctx, mm); err != nil {
                        if t.Logger != nil {
                            t.Logger.Warn("failed to process Raft message", zap.Error(err))
                        }
                    }
                case <-p.stopc:
                    return
                }
            }
        }()
    
        // r.Process might block for processing proposal when there is no leader.
        // Thus propc must be put into a separate routine with recvc to avoid blocking
        // processing other raft messages.
        go func() {
            for {
                select {
                case mm := <-p.propc:
                    if err := r.Process(ctx, mm); err != nil {
                        if t.Logger != nil {
                            t.Logger.Warn("failed to process Raft message", zap.Error(err))
                        }
                    }
                case <-p.stopc:
                    return
                }
            }
        }()
    

    raft.Process()方法首先检查发送消息的节点是否被移除,如果没有被移除则调用raft.Step()方法进行处理。

    // file: etcdserver/server.go
    // Process takes a raft message and applies it to the server's raft state
    // machine, respecting any timeout of the given context.
    func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
        lg := s.getLogger()
        if s.cluster.IsIDRemoved(types.ID(m.From)) {
            lg.Warn(
                "rejected Raft message from removed member",
                zap.String("local-member-id", s.ID().String()),
                zap.String("removed-member-id", types.ID(m.From).String()),
            )
            return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
        }
        if m.Type == raftpb.MsgApp {
            s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
        }
        return s.r.Step(ctx, m)
    }
    

    接着调用node.step()方法。其会将Message写入node的recvc通道。

    func (n *node) step(ctx context.Context, m pb.Message) error {
        return n.stepWithWaitOption(ctx, m, false)
    }
    

    recvc通道接收到消息之后会将其应用在状态机中。

    case m := <-n.recvc:
                // filter out response message from unknown From.
                if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
                    r.Step(m)
                }
    

    当收到消息的Term大于当前Term,且消息类型为MsgPreVoteMsgVote,会先根据message中是否带有campaignTransfer的context,以此决定是否强制当前节点参与选举。接着会判断是否开启CheckQuorum模式,当前节点是否有已知的的lead节点,以及选取计时器是否超时。如果不是强制要求此节点参与选举过程,同时节点在lease内,则直接返回。
    同时需要注意的是,由于当前节点的term值小于消息的term值,因此会根据消息的类型做相应的处理。

    1. 如果消息类型为MsgPreVote,则当前节点的状态机不做处理。
    2. 如果消息类型为MsgVote,则当前节点becomeFollower,并且用消息的term值重置状态机。但是当前节点的lead为空。
    3. 如果是leader发送过来的MsgApp, MsgHearbeat以及MsgASnap消息,则用消息的term值重置状态机,并将lead置为发送消息的节点。
    func (r *raft) Step(m pb.Message) error {
        // Handle the message term, which may result in our stepping down to a follower.
        switch {
        case m.Term == 0:
            // local message
        case m.Term > r.Term:
            if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
                force := bytes.Equal(m.Context, []byte(campaignTransfer))
                inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
                if !force && inLease {
                    // If a server receives a RequestVote request within the minimum election timeout
                    // of hearing from a current leader, it does not update its term or grant its vote
                    r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
                        r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
                    return nil
                }
            }
            switch {
            case m.Type == pb.MsgPreVote:
                // Never change our term in response to a PreVote
            case m.Type == pb.MsgPreVoteResp && !m.Reject:
                // We send pre-vote requests with a term in our future. If the
                // pre-vote is granted, we will increment our term when we get a
                // quorum. If it is not, the term comes from the node that
                // rejected our vote so we should become a follower at the new
                // term.
            default:
                r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
                    r.id, r.Term, m.Type, m.From, m.Term)
                if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                    r.becomeFollower(m.Term, m.From)
                } else {
                    r.becomeFollower(m.Term, None)
                }
            }
    

    当前节点在参与选举时,会考虑如下情况来决定是否参与投票。只有满足下面1,2,3中的一个,且同时满足条件4才能将票投给发送消息的节点。

    1. 当前节点是否已经投过票,而且投的正是发送消息的节点。
    2. 当前节点还没有投票,而且lead为空。
    3. 消息的type为PreVote,而且消息的Term值大于当前节点状态机的Term值。
    4. 发送消息的节点的raftLog是否包含当前节点的所有entries信息。
    case pb.MsgVote, pb.MsgPreVote:
            // We can vote if this is a repeat of a vote we've already cast...
            canVote := r.Vote == m.From ||
                // ...we haven't voted and we don't think there's a leader yet in this term...
                (r.Vote == None && r.lead == None) ||
                // ...or this is a PreVote for a future term...
                (m.Type == pb.MsgPreVote && m.Term > r.Term)
            // ...and we believe the candidate is up to date.
            if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
                // Note: it turns out that that learners must be allowed to cast votes.
                // This seems counter- intuitive but is necessary in the situation in which
                // a learner has been promoted (i.e. is now a voter) but has not learned
                // about this yet.
                // For example, consider a group in which id=1 is a learner and id=2 and
                // id=3 are voters. A configuration change promoting 1 can be committed on
                // the quorum `{2,3}` without the config change being appended to the
                // learner's log. If the leader (say 2) fails, there are de facto two
                // voters remaining. Only 3 can win an election (due to its log containing
                // all committed entries), but to do so it will need 1 to vote. But 1
                // considers itself a learner and will continue to do so until 3 has
                // stepped up as leader, replicates the conf change to 1, and 1 applies it.
                // Ultimately, by receiving a request to vote, the learner realizes that
                // the candidate believes it to be a voter, and that it should act
                // accordingly. The candidate's config may be stale, too; but in that case
                // it won't win the election, at least in the absence of the bug discussed
                // in:
                // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
                r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
                    r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
                // When responding to Msg{Pre,}Vote messages we include the term
                // from the message, not the local term. To see why, consider the
                // case where a single node was previously partitioned away and
                // it's local term is now out of date. If we include the local term
                // (recall that for pre-votes we don't update the local term), the
                // (pre-)campaigning node on the other end will proceed to ignore
                // the message (it ignores all out of date messages).
                // The term in the original message and current local term are the
                // same in the case of regular votes, but different for pre-votes.
                r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
                if m.Type == pb.MsgVote {
                    // Only record real votes.
                    r.electionElapsed = 0
                    r.Vote = m.From
                }
    

    此时,会发送一条响应信息。如果消息类型为pb.MsgVote,则会将选取超时重置,并将票投给消息发送节点。

    m := pb.Message{}
    m.Term= m.Term
    m.To = m.From
    m.Type = pb.MsgPreVoteResp/pb.MsgVoteResp
    

    但是,如果是拒绝投票的话,响应消息为:

    m := pb.Message{}
    m.Term= m.Term
    m.To = m.From
    m.Type = pb.MsgPreVoteResp/pb.MsgVoteResp
    m.Reject = true // 拒绝投票
    

    发送消息的节点,即PreCandidate/Candidate节点收到集群中其他节点返回的MsgPreVoteResp或者MsgVoteResp消息,也会调用Step方法进行处理。由于响应消息的term值与节点的Term相等(如果是PreVote,则term都没有加1;如果是Vote,则term值都已经加1),因此会直接调用stepCandidate进行消息处理。节点根据响应消息进行投票统计,然后根据投票结果以及自身状态机的状态来进行相应的处理。

    // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
    // whether they respond to MsgVoteResp or MsgPreVoteResp.
    func stepCandidate(r *raft, m pb.Message) error {
        // Only handle vote responses corresponding to our candidacy (while in
        // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
        // our pre-candidate state).
        var myVoteRespType pb.MessageType
        if r.state == StatePreCandidate {
            myVoteRespType = pb.MsgPreVoteResp
        } else {
            myVoteRespType = pb.MsgVoteResp
        }
        switch m.Type {
        case pb.MsgProp:
            r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
            return ErrProposalDropped
        case pb.MsgApp:
            r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
            r.handleAppendEntries(m)
        case pb.MsgHeartbeat:
            r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
            r.handleHeartbeat(m)
        case pb.MsgSnap:
            r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
            r.handleSnapshot(m)
        case myVoteRespType:
            // 统计投票结果
            gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
            r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
            switch res {
            // 如果统计投票结果获得大多数的投票
            case quorum.VoteWon:
                if r.state == StatePreCandidate {
                    // 如果状态机的状态是preCandidate,则开始竞选leader
                    r.campaign(campaignElection)
                } else {
                    // 如果状态机的状态是Candidate则直接成为leader并向集群中其他节点广播MsgApp消息
                    r.becomeLeader()
                    r.bcastAppend()
                }
            case quorum.VoteLost:
                // pb.MsgPreVoteResp contains future term of pre-candidate
                // m.Term > r.Term; reuse r.Term
                r.becomeFollower(r.Term, None)
            }
        case pb.MsgTimeoutNow:
            r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
        }
        return nil
    }
    

    如果赢得选举投票,则根据当前状态则决定下一步如何处理。
    (1) 如果状态为PreCandidate,则开始竞选leader,调用campaign(campaignElection)
    (2) 如果状态为其他,即Candidate,则竞选leader成功,并向集群中其他节点广播MsgApp消息。

    • followers的Progress信息进入Replicate状态。
    • leader会append一条空的entry记录
      如果选举失败,则进入becomeFollower
    func (r *raft) becomeLeader() {
        // TODO(xiangli) remove the panic when the raft implementation is stable
        if r.state == StateFollower {
            panic("invalid transition [follower -> leader]")
        }
        r.step = stepLeader
        r.reset(r.Term)
        r.tick = r.tickHeartbeat
        r.lead = r.id
        r.state = StateLeader
        // Followers enter replicate mode when they've been successfully probed
        // (perhaps after having received a snapshot as a result). The leader is
        // trivially in this state. Note that r.reset() has initialized this
        // progress with the last index already.
        r.prs.Progress[r.id].BecomeReplicate()
    
        // Conservatively set the pendingConfIndex to the last index in the
        // log. There may or may not be a pending config change, but it's
        // safe to delay any future proposals until we commit all our
        // pending log entries, and scanning the entire tail of the log
        // could be expensive.
        r.pendingConfIndex = r.raftLog.lastIndex()
    
        emptyEnt := pb.Entry{Data: nil}
        if !r.appendEntry(emptyEnt) {
            // This won't happen because we just called reset() above.
            r.logger.Panic("empty entry was dropped")
        }
        // As a special case, don't count the initial empty entry towards the
        // uncommitted log quota. This is because we want to preserve the
        // behavior of allowing one entry larger than quota if the current
        // usage is zero.
        r.reduceUncommittedSize([]pb.Entry{emptyEnt})
        r.logger.Infof("%x became leader at term %d", r.id, r.Term)
    }
    

    同步Log

    Leader节点会向其他节点发送同步log的消息。


    // sendAppend sends an append RPC with new entries (if any) and the
    // current commit index to the given peer.
    func (r *raft) sendAppend(to uint64) {
        r.maybeSendAppend(to, true)
    }
    

    在向peers发送Append信息时,首先会判断Progress是否暂停,leader节点有所有peers的Progress信息,并根据进度的信息以及状态来判断是否暂停。被跟踪的follower节点有三种状态类型:

    // StateType is the state of a tracked follower.
    type StateType uint64
    
    const (
        // StateProbe indicates a follower whose last index isn't known. Such a
        // follower is "probed" (i.e. an append sent periodically) to narrow down
        // its last index. In the ideal (and common) case, only one round of probing
        // is necessary as the follower will react with a hint. Followers that are
        // probed over extended periods of time are often offline.
        StateProbe StateType = iota
        // StateReplicate is the state steady in which a follower eagerly receives
        // log entries to append to its log.
        StateReplicate
        // StateSnapshot indicates a follower that needs log entries not available
        // from the leader's Raft log. Such a follower needs a full snapshot to
        // return to StateReplicate.
        StateSnapshot
    )
    

    如果没有被暂停,则尝试去获取需要发送到peer的term和entries,如果获取失败,则说明需要发送快照信息pb.MsgSnap,因为有可能entries已经被压缩,因此会尝试去发送快照信息,同时Progress进入StateSnapshot状态。如果成功获取到term和entries,则发送pb.MsgApp信息。

    m := pb.Message{}
    m.To = to
    m.Type = pb.MsgSnap
    m.Snapshot = snapshot
    

    或者

    m := pb.Message{}
    m.To = to
    m.Type = pb.MsgApp
    m.Index = pr.Next - 1
    m.LogTerm = term
    m.Entries = ents
    m.Commit = r.raftLog.committed
    
    func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
        pr := r.prs.Progress[to]
        if pr.IsPaused() {
            return false
        }
        m := pb.Message{}
        m.To = to
    
        term, errt := r.raftLog.term(pr.Next - 1)
        ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
        if len(ents) == 0 && !sendIfEmpty {
            return false
        }
    
        if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
            if !pr.RecentActive {
                r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
                return false
            }
    
            m.Type = pb.MsgSnap
            snapshot, err := r.raftLog.snapshot()
            if err != nil {
                if err == ErrSnapshotTemporarilyUnavailable {
                    r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                    return false
                }
                panic(err) // TODO(bdarnell)
            }
            if IsEmptySnap(snapshot) {
                panic("need non-empty snapshot")
            }
            m.Snapshot = snapshot
            sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
            r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
                r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
            pr.BecomeSnapshot(sindex)
            r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
        } else {
            m.Type = pb.MsgApp
            m.Index = pr.Next - 1
            m.LogTerm = term
            m.Entries = ents
            m.Commit = r.raftLog.committed
            if n := len(m.Entries); n != 0 {
                switch pr.State {
                // optimistically increase the next when in StateReplicate
                case tracker.StateReplicate:
                    last := m.Entries[n-1].Index
                    pr.OptimisticUpdate(last)
                    pr.Inflights.Add(last)
                case tracker.StateProbe:
                    pr.ProbeSent = true
                default:
                    r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
                }
            }
        }
        r.send(m)
        return true
    }
    

    问题点

    1. 在统计计票赢得leader之后会去调用r.becomeLeaderr.bcastAppend,如果超过quorum个节点回复响应消息,那是否会被调用多次?
    2. Leader节点append一条空的entry记录的作用是啥?

    References

    相关文章

      网友评论

          本文标题:etcd leader选举过程(草稿)

          本文链接:https://www.haomeiwen.com/subject/gxngrktx.html