美文网首页
2.2 etcd源码笔记 - raft library - 流程

2.2 etcd源码笔记 - raft library - 流程

作者: 兰CC | 来源:发表于2019-08-07 15:17 被阅读0次

    一、基础元素

    1. 提案记录

    raft 状态机 最核心的功能就是协商出一致的提案(或者说是将提案同步到整个集群),

    每一条提案由 etcd/raft/raftpb/raft.pb.go:Entry 表示

    type Entry struct {
          //当前的选举轮次
        Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
        //每一条提案都有一个Index,递增,可以理解为ID
        Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
        //普通提案 或 配置变更提案
        Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
        //具体的数据
        Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
        //这个不用关心,用于存储PB后的二进制数据
        XXX_unrecognized []byte    `json:"-"`
    }
    

    2. 快照

    同步数据过程中, raft library 采用 “快照 + 提案” 的机制。

    服务会不定期地将 已协商一致的提案 打包压缩成一个 数据快照

    比如 put(a,1) put(a,2) put(b,3) 会打包成 数据快照 a:2;b:3

    正常情况下,节点启动会从本地读取 数据快照,然后在此基础上继续同步提案

    同步提案的过程中,如果有异常,则会将先同步 Leader 的最新 数据快照Follower,然后再此 数据快照 的基础上,再继续同步提案

    type Snapshot struct {
        //被打包压缩的数据,数据格式由具体的应用层提供
        Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
        //该份数据对应的 raft 状态机的状态,包括 Nodes、Index、Item
        Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
        //不用关心,表示序列成PB的二进制数据
        XXX_unrecognized []byte           `json:"-"`
    }
    

    3. offset

    哪些提案已协商好、应用层已处理好哪些协商好的提案,并不是记录具体的提案,

    而是按顺序处理每个提案,然后标记一个 offset,表示协商、处理到哪个提案了。

    比如 committed_indexapplied_index

    4. raftLog

    在整个过程中,需要记录以下元素

    • 已协商一致的提案
    • 正在协商中的提案
    • 应用层已处理完的已协商一致提案

    以上内容都记录在 etcd/raft/log.go:raftLog

    type raftLog struct {
        //之前以为是 storage 表示“协商一致的提案”,unstable表示“协商中的提案”
        //然后后者协商一致后,挪到前者
        //然而我错了,
        //storage 如字面意思,就是存储的,已经落地了,但是目前只有一个实现 MemoryStorage,也是搞笑
        //unstable表示不稳定的,在内存里的数据
        storage Storage
    
        unstable unstable
    
        //提案Index,表示已协商到哪个提案
        committed uint64
        
        //提案Index,表示应用层已处理到哪个提案
        applied uint64
    
        logger Logger
    }
    
    type MemoryStorage struct {
        sync.Mutex
        
        //当前raft状态机的状态
        //TODO: 介绍每个字段的含义
        hardState pb.HardState
        
        //协商好的数据快照
        snapshot  pb.Snapshot
        
        //协商好的提案
        ents []pb.Entry
    }
    
    type unstable struct {
        //同步数据有异常时,需要从 Leader 同步到 Follower 的 数据快照
        //这个字段有值,就会被塞到 Ready.Snapshot,进而被传输层丢给 Follower
        snapshot *pb.Snapshot
        
        //正在协商中的提案
        entries []pb.Entry
        
        //这个跟上面的offset不是同一个意思,取值为 snapshot.lastIndex
        //这是为了方便从 entries 中获取数据,entries[entry.index - offset]=entry 就等于 
        offset  uint64
    
        logger Logger
    }
    

    5. progress

    raft library 在 Leader 与 Follower 同步数据的过程中,定义了 Progress 来表示 Folllower 的状态信息。

    包括 当前是否存活、已发送成功Index、待发送Index、状态等。

    其中状态有三种

    • probe 探测状态,每次只同步一条提案;当接收成功,就表示该节点是正常状态,会切换至 replicate
    • replicate 副本状态,每次都同步尽量多的提案;当发生异常,就会切换至 snapshot
    • snapshot 快照状态,需要同步 数据快照,同步完切换至 probe

    源码附带的文档 etcd/raft/design.mdProgress 详细的设计说明介绍可以参考之。

    type Progress struct {
        //已发送成功的提案Index
        Match uint64
        //下一条要发送的提案Index
        Next uint64
        
        //当前状态
        State ProgressStateType
    
        //probe状态使用,探测出节点有问题,会标记为true,就会停止同步数据
        Paused bool
        
        //表示要同步的 snapshot 的 index
        PendingSnapshot uint64
    
        //是否存活,收到任何来自 Follower 的消息,都认为其状态是存活的
        //Leader 会在每次发心跳的时候,顺便检查一下该状态
        RecentActive bool
    
        //发送数据的一个滑动窗口,机制类似TCP那样子的滑动窗口
        ins *inflights
    
        //是否Learner 是的话,只是复本,不能参与选举
        IsLearner bool
    }
    

    二、流程 - 启动

      1. 入口 etcd/raft/node.go

    应用层会调用该 StartNode 启动一个 raft-node

    func StartNode(c *Config, peers []Peer) Node {
        //raft相关的操作封装到 etcd/raft/raft.go:raft 
        r := newRaft(c)
        
        //初始启动时,状态是 Follower,并且 term:1,Leader:None
        //要嘛就是收到 Leader 的心跳作为 Follower 加入整个集群,
        // 要嘛就是到点触发 tickElection,进行选举。
        r.becomeFollower(1, None)
        
        //把 提案-“增加集群中的节点” 加到 “已协商提案” 中
        //这样是为了把 提案-“增加集群中的节点” 通过 ready chan 吐出去给外系统
        //比如“传输层”需要拿到这些节点,然后创建对应的连接
        //比如 node本身 拿到这些节点,创建节点对应的progress
        //(这边其实是为了复用创建progress的流程,不然是可以直接调用代码创建progress)
        for _, peer := range peers {
            cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
            d, err := cc.Marshal()
            if err != nil {
                panic("unexpected marshal error")
            }
            e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
            r.raftLog.append(e)
        }
        
        //从本地加载出来的都是已经协商一致的
        r.raftLog.committed = r.raftLog.lastIndex()
        
        //创建集群中各个节点对应的 progress
        //上面把节点新增消息塞到 ready chan,也是为了创建 progress
        //这边直接代码调用,是为了test case 可以立即在 StartNode 执行完之后,马上进行选举
        //不然走 ready chan 会有一定时间的延迟
        for _, peer := range peers {
            r.addNode(peer.ID)
        }
    
        n := newNode()
        n.logger = c.Logger
        
        //最终启动的姿态就是,开启一个 for 循环不断读取 raft 状态机 丢出来的数据并进行处理
        go n.run(r)
        return &n
    }
    
      1. newRaft(c *Config)

    源代码 etcd/raft/raft.go 比较简单,这边就介绍一下 raft

    type raft struct {
        //外界会传入节点ID
        //生成方式  sha1(cluster name, peer URLs, time)
        id uint64
        //当前选举轮次
        Term uint64 
        //这个好像是投票给哪个 node?
        Vote uint64 
        //跟读取数据相关,后续介绍
        readStates []ReadState
        //这个就是基础数据里所描述的raftLog
        raftLog *raftLog
        //各种progress
        prs         map[uint64]*Progress
        learnerPrs  map[uint64]*Progress
        //如果是Leaner 就只是作为副本,不能选举
        isLearner bool
        //投票结果
        votes map[uint64]bool
        //这个就是要吐给 ready chan 的数据,处理完又会被清空
        msgs []pb.Message
        // the leader id
        lead uint64
        //即将要更换的lead
        leadTransferee uint64
        //周期的任务
        tick func()
        //接收到消息的处理 stepFollower、stepCandidate、stepLeader
        step stepFunc
        //每次心跳是否要校验节点是否足够
        checkQuorum bool
        //选举前,是否要判断一下存活的节点是否足够
        preVote     bool
        //选举和心跳相关
        electionElapsed int
        heartbeatElapsed int
        heartbeatTimeout int
        electionTimeout  int
        randomizedElectionTimeout int
        disableProposalForwarding bool
        ...
    }
    
      1. raft.becomeFollower(term uint64, lead uint64)

    状态切为 Follower后,

    要嘛就是收到 Leader 的心跳作为 Follower 加入整个集群,

    要嘛就是到点触发 tickElection,进行选举。

    func (r *raft) becomeFollower(term uint64, lead uint64) {
        //当接收到消息时,如果共用的Step方法处理不了,就会调用  stepFollower
        r.step = stepFollower
        r.reset(term)
        //到点会触发选举
        r.tick = r.tickElection
        //设置 Leader,如果初启时,Leader 为 None
        r.lead = lead
        r.state = StateFollower
        r.logger.Infof("%x became follower at term %d", r.id, r.Term)
    }
    
      1. node.run(r *raft)

    for 循环监听 raft 状态机吐出的数据,并进行对应的处理

    func (n *node) run(r *raft) {
        //准备各种变量
        var propc chan pb.Message
        var readyc chan Ready
        var advancec chan struct{}
        var prevLastUnstablei, prevLastUnstablet uint64
        var havePrevLastUnstablei bool
        var prevSnapi uint64
        var rd Ready
    
        lead := None
        //TODO: softState到底是个啥?
        prevSoftSt := r.softState()
        prevHardSt := emptyState
        
        //开始无限循环,无限监听
        for {
            if advancec != nil {
                readyc = nil
            } else {
                //构建 raft 状态机 已经协商好的数据
                rd = newReady(r, prevSoftSt, prevHardSt)
                if rd.containsUpdates() {
                    //如果有更新,就将readyc 赋值为 node.readyc,后面写数据要用
                    readyc = n.readyc
                } else {
                    //如果有更新,就将readyc置为nil,这样后面就不会写数据到 node.readyc,就不会有数据输出
                    //不过这样通过把 readyc 置为nil 来控制不输出数据的写法,有点怪怪的
                    readyc = nil
                }
            }
    
            //r.lead 比 lead 更新,如果不相等,说明 raft 状态机的 lead 有变动
            if lead != r.lead {
                if r.hasLeader() {
                    if lead == None {
                        r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
                    } else {
                        r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
                    }
                    //把 propc 赋值为 node.proc,读取出需要处理的消息
                    propc = n.propc
                } else {
                    r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
                    //丢失lead,则把proc置为nil,不再处理消息
                    propc = nil
                }
                //更新lead
                lead = r.lead
            }
    
            select {
            // TODO: maybe buffer the config propose if there exists one (the way
            // described in raft dissertation)
            // Currently it is dropped in Step silently.
            case m := <-propc:
                //有状态机需要处理的消息,直接调用 raft.Step(m pb.Message)
                m.From = r.id
                r.Step(m)
            case m := <-n.recvc:
                // filter out response message from unknown From.
                if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
                    r.Step(m) // raft never returns an error
                }
            case cc := <-n.confc:
                //节点配置发生变化,需要变更对应的progress
                if cc.NodeID == None {
                    r.resetPendingConf()
                    select {
                    case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
                    case <-n.done:
                    }
                    break
                }
                switch cc.Type {
                case pb.ConfChangeAddNode:
                    r.addNode(cc.NodeID)
                case pb.ConfChangeAddLearnerNode:
                    r.addLearner(cc.NodeID)
                case pb.ConfChangeRemoveNode:
                    // block incoming proposal when local node is
                    // removed
                    if cc.NodeID == r.id {
                        propc = nil
                    }
                    r.removeNode(cc.NodeID)
                case pb.ConfChangeUpdateNode:
                    r.resetPendingConf()
                default:
                    panic("unexpected conf type")
                }
                select {
                //把最新的状态吐出去
                case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
                case <-n.done:
                }
            case <-n.tickc:
                //到点执行周期任务 Leader-tickHeartbeat; Candidate && follower-tickElecion
                r.tick()
            case readyc <- rd:
                //这就是上面提到的 有更新时 readyc不为nil, 更新数据rd会写入 readyc
                //这样系统外就能收到这个更新信息
                if rd.SoftState != nil {
                    prevSoftSt = rd.SoftState
                }
                if len(rd.Entries) > 0 {
                    prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
                    prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
                    havePrevLastUnstablei = true
                }
                if !IsEmptyHardState(rd.HardState) {
                    prevHardSt = rd.HardState
                }
                if !IsEmptySnap(rd.Snapshot) {
                    prevSnapi = rd.Snapshot.Metadata.Index
                }
    
                //r.msgs表示同步给其他节点的信息,这边传到系统外之后就要置为nil
                r.msgs = nil
                //同上,读取数据的时候使用,置为nil
                r.readStates = nil
                //advancec不为nil,下一个for循环就会执行 <-advancec
                advancec = n.advancec
            case <-advancec:
                //更新raftlog,将“协商中”的数据,更新为 “协商一致”
                if prevHardSt.Commit != 0 {
                    r.raftLog.appliedTo(prevHardSt.Commit)
                }
                if havePrevLastUnstablei {
                    //压缩空间,移除已协商一致的数据
                    r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
                    havePrevLastUnstablei = false
                }
                r.raftLog.stableSnapTo(prevSnapi)
                //advancec为nil,下一个for循环就不会执行 <-advancec
                advancec = nil
            case c := <-n.status:
                //丢状态信息到系统外
                c <- getStatus(r)
            case <-n.stop:
                //下掉节点
                close(n.done)
                return
            }
        }
    }
    

    三、流程 - 选举

    当 Follower 和 Candidates 到点之后,会开始触发选举,具体步骤如下

      1. Follower && Candidate 调用 raft.tickElection()
    // tickElection is run by followers and candidates after r.electionTimeout.
    func (r *raft) tickElection() {
        r.electionElapsed++
    
        if r.promotable() && r.pastElectionTimeout() {
            r.electionElapsed = 0
            //发一条 pb.MsgHub 的消息到状态机
            //Hub 译义是 “行进时为使步伐一致的吆喝声”
            //在整个同步集群中,这个词就显得很有意思,因为很贴切,
            //发出这么一个类型的消息,就是为了让集群内的节点在某个关系上达到一致,或数据一致(步伐一致)
            r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
        }
    }
    
      1. Follower && Candidate 调用 raft.Step(pb.MsgHup)
    func (r *raft) Step(m pb.Message) error {
        ...
        switch m.Type {
        case pb.MsgHup:
            if r.state != StateLeader {
                ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
                if err != nil {
                    r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
                }
                if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
                    r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
                    return nil
                }
    
                r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
                if r.preVote {
                    //如果开启preVote,会先进行选举前的校验,即判断节点个数是否足够,具体就是 
                    //1.将状态置为 StatePreCandidate,
                    //2.然后给其他节点发 pb.MsgPreVote,其他节点如果有反馈,则计数+1
                    //3.一直到 发起选举的节点,收到了 节点数/2 + 1  的反馈,该节点会进入  StateCandidate, 就可以进行选举了
                    //这样做是为了防止脑裂时,节点数不够还进行无效选举, 导致 Item 不断增大
                    r.campaign(campaignPreElection)
                } else {
                    //直接发起选举
                    r.campaign(campaignElection)
                }
            } else {
                r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
            }
        ...
        }
        ...
        return nil
    }
    
      1. Follower && Candidate 调用 raft.campaign(campaignElection)

    这边直接选举,不再介绍 campaignPreElection 的流程

    func (r *raft) campaign(t CampaignType) {
        var term uint64
        var voteMsg pb.MessageType
        if t == campaignPreElection {
            //预选举的流程
            r.becomePreCandidate()
            voteMsg = pb.MsgPreVote
            term = r.Term + 1
        } else {
            //选举的流程
            r.becomeCandidate()
            voteMsg = pb.MsgVote
            term = r.Term
        }
        
        //如果投票结果超过法定节点数(总节点数/2 + 1),那就直接进入下步,预选举 --> 选举 or 选举 --> Leader
        if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
            if t == campaignPreElection {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader()
            }
            return
        }
        
        //挨个节点发 MsgPreVote 或 MsgVote
        for id := range r.prs {
            if id == r.id {
                continue
            }
            r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
                r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
    
            var ctx []byte
            if t == campaignTransfer {
                ctx = []byte(t)
            }
            r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
        }
    }
    
      1. Candidate 调用 raft.send(voteMsg)
    func (r *raft) send(m pb.Message) {
        ...
        //把消息写到 raft.msgs,再写到 ready chan,最后由 传输层 丢给其他节点
        //参考 “一、4. node.run(r *raft)”
        r.msgs = append(r.msgs, m)
    }
    
      1. 其他节点 调用 node.step(context.Context, voteMsg)
    func (n *node) step(ctx context.Context, m pb.Message) error {
        //把消息写到 node.recvc,然后node的 run 会再次消费到该消息,然后调用 raft.step
        //参考 “一、4. node.run(r *raft)”
        ch := n.recvc
        if m.Type == pb.MsgProp {
            ch = n.propc
        }
    
        select {
        case ch <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }
    
      1. 其他节点 调用 raft.step(voteMsg)
    func (r *raft) Step(m pb.Message) error {
        ...
        switch m.Type {
        ...
        case pb.MsgVote, pb.MsgPreVote:
            //如果是Learner,不参与投票
            if r.isLearner {
                r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
                    r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
                return nil
            }
    
            if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
                r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
                    r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
                //回给发送方一条 MsgPreVoteResp 或 MsgVoteResp
                //发送操作逻辑同上面的 “4. Follower && Candidate 调用 raft.send(preVoteMsg / voteMsg)”
                r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
                if m.Type == pb.MsgVote {
                    r.electionElapsed = 0
                    r.Vote = m.From
                }
            } else {
                r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
                    r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
                r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
            }
        ...
        }
        ...
        return nil
    }
    
      1. Candidate 调用 stepCandidate(*raft, voteMsgResp)
    func stepCandidate(r *raft, m pb.Message) {
        var myVoteRespType pb.MessageType
        if r.state == StatePreCandidate {
            myVoteRespType = pb.MsgPreVoteResp
        } else {
            myVoteRespType = pb.MsgVoteResp
        }
        switch m.Type {
        ... 
        case myVoteRespType:
            gr := r.poll(m.From, m.Type, !m.Reject)
            r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
            //如果投票结果超过法定节点数(总节点数/2 + 1),那就直接进入下步,预选举 --> 选举 or 选举 --> Leader
            switch r.quorum() {
            case gr:
                if r.state == StatePreCandidate {
                    r.campaign(campaignElection)
                } else {
                    //节点本身切为 Leader
                    r.becomeLeader()
                    //开始向其他节点分发提案
                    r.bcastAppend()
                }
            case len(r.votes) - gr:
                r.becomeFollower(r.Term, None)
            }
        ...
        }
    }
    
      1. 其他节点 调用 Step(任意pb.Message)
    func (r *raft) Step(m pb.Message) error {
        // Handle the message term, which may result in our stepping down to a follower.
        switch {
        case m.Term == 0:
            // local message
        case m.Term > r.Term:
            ...
            switch {
            ...
            default:
                r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
                    r.id, r.Term, m.Type, m.From, m.Term)
                if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                    //更新Leader,节点本身变成 Follower
                    r.becomeFollower(m.Term, m.From)
                } else {
                    r.becomeFollower(m.Term, None)
                }
            }
        ...
        }
    }
    

    四、流程 - put(k, v)

    代码路径如下

      1. Leader/Follower 调用 Propose(ctx context.Context, data []byte)
    func (n *node) Propose(ctx context.Context, data []byte) error {
        //一样是写 MspProp 消息到 raft 状态机,然后写到 node.propc,再由node.run 的 for循环处理
        //最终调用 raft.Step
        return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
    }
    
    • 2.1 Follower 调用 stepFollower(r *raft, pb.MsgProp)
    func stepFollower(r *raft, m pb.Message) {
        switch m.Type {
        case pb.MsgProp:
            if r.lead == None {
                r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
                return
            } else if r.disableProposalForwarding {
                r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
                return
            }
            //会转发给 Leader
            m.To = r.lead
            r.send(m)
        ...
        }
    }
    
    • 2.2 Leader 调用 stepLeader(r *raft, pb.MsgProp)
    func stepLeader(r *raft, m pb.Message) {
        switch m.Type {
        ...
        case pb.MsgProp:
            ...
            //把提案追加到“协商中的提案”中
            r.appendEntry(m.Entries...)
            //把上述的操作广播出去
            //因为需要Follower表示OK,Leader才能commit
            r.bcastAppend()
            return
        ...
        }
    }
    
    func (r *raft) appendEntry(es ...pb.Entry) {
        //取最新的Index
        //先取 “协商中的提案” 的最新Index,raftLog.unstable.entries[最后一个] --> raftlog.unstable.snapshot.LastIndex()
        //如果没有,则取 “已协商一致的提案”的最新Index,raftLog.storage.LastIndex()
        li := r.raftLog.lastIndex()
        //挨个递增+1赋值Index
        for i := range es {
            es[i].Term = r.Term
            es[i].Index = li + 1 + uint64(i)
        }
        //追加到“协商中的提案”中
        r.raftLog.append(es...)
        //1.“代表Leader的Progress”中的match 更新为 最新的Index,
        //match表示已发送,由于Leader本身不用发送,所以就直接更新了
        //2.方法名为嘛多了maybe前缀呢,因为有可能更新不成功,
        //当 lastIndex <= match 时,就更新不成功,
        //因为raft library 的机制只允许offset往前走
        r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex())
        //提交,即 raftLog.committed 住前进
        //1.如果投票结果少于 (节点数/2 +1 ),就提交不了,所以叫 maybeCommit
        //2.理论上只有单节点的时候才会提交成功,
        //多节点,会在响应Follower反馈的地方,再次调用该方法进行提交
        //所以这边不用关心返回值
        r.maybeCommit()
    }
    
    func (r *raft) maybeCommit() bool {
        //把所有节点已收到的提案Index放在一个数组里
        mis := make(uint64Slice, 0, len(r.prs))
        for _, p := range r.prs {
            mis = append(mis, p.Match)
        }
        //降序
        sort.Sort(sort.Reverse(mis))
        //取中间法定数-1的match
        //比如节点是5,法定数就是3
        //mci - matchIndex = 降序Match数组[2],即取法定数中match最小的那一个
        //如果 mci > raftLog.committed,说明降序Match数组至少有 3个已经满足 mci > raftLog.committed
        //这样就可以提交了
        mci := mis[r.quorum()-1]
        //尝试提交
        return r.raftLog.maybeCommit(mci, r.Term)
    }
    
    func (r *raft) bcastAppend() {
        //挨个progress进行 sendAppend
        r.forEachProgress(func(id uint64, _ *Progress) {
            if id == r.id {
                return
            }
    
            r.sendAppend(id)
        })
    }
    
    func (r *raft) sendAppend(to uint64) {
        pr := r.getProgress(to)
        if pr.IsPaused() {
            return
        }
        m := pb.Message{}
        m.To = to
        
        term, errt := r.raftLog.term(pr.Next - 1)
        ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
        
        if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
            //就是上面所说的,同步过程中出现异常,就会重新同步  数据快照
            if !pr.RecentActive {
                r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
                return
            }
            m.Type = pb.MsgSnap
            snapshot, err := r.raftLog.snapshot()
            if err != nil {
                if err == ErrSnapshotTemporarilyUnavailable {
                    r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                    return
                }
                panic(err) // TODO(bdarnell)
            }
            if IsEmptySnap(snapshot) {
                panic("need non-empty snapshot")
            }
            m.Snapshot = snapshot
            sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
            r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
                r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
            //progress 切到 snapshot 状态
            pr.becomeSnapshot(sindex)
            r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
        } else {
            //正常情况是会给 Follower 发 MsgApp 消息
            m.Type = pb.MsgApp
            m.Index = pr.Next - 1
            m.LogTerm = term
            m.Entries = ents
            m.Commit = r.raftLog.committed
            if n := len(m.Entries); n != 0 {
                switch pr.State {
                // optimistically increase the next when in ProgressStateReplicate
                case ProgressStateReplicate:
                    last := m.Entries[n-1].Index
                    pr.optimisticUpdate(last)
                    pr.ins.add(last)
                case ProgressStateProbe:
                    //如果是探测状态,发完这一条,就不会再发
                    //然后收到反馈时,会调用  pr.resume()
                    pr.pause()
                default:
                    r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
                }
            }
        }
        // 这边就是要发提案消息了
        r.send(m)
    }
    
    func (r *raft) send(m pb.Message) {
        m.From = r.id
        ...
        //追加到 raft.msgs,写到 ready chan,再由 node.run() 消费到,丢给传输层,再丢给其他节点
        r.msgs = append(r.msgs, m)
    }
    
      1. Follwer 调用 stepFollower(*raft, pb.MsgApp)
    func stepFollower(r *raft, m pb.Message) {
        switch m.Type {
        ...
        case pb.MsgApp:
            r.electionElapsed = 0
            r.lead = m.From
            r.handleAppendEntries(m)
        ...
        }
    }
    
    func (r *raft) handleAppendEntries(m pb.Message) {
        if m.Index < r.raftLog.committed {
            r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
            return
        }
    
        if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
            //如果 term 是对的,并且能够提交(提交细节很装箱单 的,不再介绍,自己看一下maybeAppend)
            //就给 Leader 反馈一条 pb.MsgAppResp,并且 reject : false
            r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
        } else {
            //不能提交
            //就给 Leader 反馈一条 pb.MsgAppResp,并且 reject : true,带上节点最新的 Index
            //Leader 会尝试同步更旧的数据
            r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
                r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
            r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
        }
    }
    
      1. Leader 调用 stepLeader(*raft, pb.MsgAppResp)
    func stepLeader(r *raft, m pb.Message) {
        ...
        pr := r.getProgress(m.From)
        if pr == nil {
            r.logger.Debugf("%x no progress available for %x", r.id, m.From)
            return
        }
        switch m.Type {
        case pb.MsgAppResp:
            //收到反馈信息,说明该节点是存活的
            pr.RecentActive = true
    
            if m.Reject {
                r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
                    r.id, m.RejectHint, m.From, m.Index)
                //尝试从更旧的数据开始同步
                if pr.maybeDecrTo(m.Index, m.RejectHint) {
                    r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                    if pr.State == ProgressStateReplicate {
                        pr.becomeProbe()
                    }
                    r.sendAppend(m.From)
                }
            } else {
                oldc := pr.IsPaused()
                //更新节点对应progress的Match及Next
                if pr.maybeUpdate(m.Index) {
                    switch {
                    case pr.State == ProgressStateProbe:
                        //节点能正常接收消息,则从探测状态切为副本状态
                        pr.becomeReplicate()
                    case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
                        r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
                        pr.becomeProbe()
                    case pr.State == ProgressStateReplicate:
                        //副本状态下,需要调整发送的滑动窗口,以备下次发送
                        pr.ins.freeTo(m.Index)
                    }
                    
                    //尝试提交,即将 “协商中的提案” 变更为 “协商一致的提案”
                    if r.maybeCommit() {
                        //如果该提案超过法定人数反馈,则表示协商一致,
                        //所有节点可以进入下一轮数据同步
                        r.bcastAppend()
                    } else if oldPaused {
                        //如果不一致,则没有必要全部进入下一轮数据同步
                        //但是节点之前如果是Paused, 那之前是可能有很多消息滞留,未发送,那就继续同步,
                        r.sendAppend(m.From)
                    }
                    // Transfer leadership is in progress.
                    if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
                        r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                        r.sendTimeoutNow(m.From)
                    }
                }
            }
        ...
        }
    }
    

    这边有一个问题,就是 raft.maybeCommit 只是会更新 offset,具体的 []entriy 的落地, 是不会更新的

    raftLog.Storage 不在这边操作,而是在应用层

    这个操作为何是在应用层做,理论上应该也封装在 raft librar 里才对。

    这边抓了 raftexample 的示例代码 etcd/contrib/raftexample/raft.go

    func (rc *raftNode) serveChannels() {
        ...
        for {
            select {
            ...
            //这边是读取ready chan,消费数据 
            case rd := <-rc.node.Ready():
                //WAL操作,先不用管,后续再介绍 WAL
                rc.wal.Save(rd.HardState, rd.Entries)
    
                //如果 ready 里有 snapshopt,就要重新设置 snapshot
                if !raft.IsEmptySnap(rd.Snapshot) {
                    rc.saveSnap(rd.Snapshot)
                    rc.raftStorage.ApplySnapshot(rd.Snapshot)
                    rc.publishSnapshot(rd.Snapshot)
                }
    
                //rc.raftStorage 与 raftLog.Storage 是同一个对象
                //就是在这位置Entries给落地了
                rc.raftStorage.Append(rd.Entries)
                rc.transport.Send(rd.Messages)
                if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                    rc.stop()
                    return
                }
                rc.maybeTriggerSnapshot()
                rc.node.Advance()
            ...
            }
        }
    }
    
      1. Leader 吐数据给应用层

    参考 *“一、基础数据 - 4. node.run(r raft)”,通过 ready chan 把数据吐出去

    func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
        rd := Ready{
            Entries:          r.raftLog.unstableEntries(),
            //就是这个位置,r.maybeCommit之后,r.raftLog.nextEnts() 就会有数据
            CommittedEntries: r.raftLog.nextEnts(),
            Messages:         r.msgs,
        }
        ...
        return rd
    }
    
      1. Leader 把 committedIndex 以心跳的方式通知 Follower

    代码路径是

    • (r *raft) tickHeartbeat()
    • (r *raft) Step(pb.Message : pb.MsgBeat)
    • stepLeader(*raft, pb.Message:pb.MsgBeat)
    • (r *raft) bcastHeartbeat()
    • (r *raft) sendHeartbeat(to uint64, ctx []byte)
    func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
        commit := min(r.getProgress(to).Match, r.raftLog.committed)
        m := pb.Message{
            To:      to,
            Type:    pb.MsgHeartbeat,
            //会带上Leader已经 commit 的 Index
            Commit:  commit,
            Context: ctx,
        }
    
        r.send(m)
    }
    
      1. Follower 接收心跳,更新 commitIndex
    func stepFollower(r *raft, m pb.Message) {
        switch m.Type {
        ...
        case pb.MsgHeartbeat:
            r.electionElapsed = 0
            r.lead = m.From
            //Follower 在这边处理 Leader 发过来的心跳包
            r.handleHeartbeat(m)
        ...
        }
    }
    
    func (r *raft) handleHeartbeat(m pb.Message) {
        //这边更新 commitIndex
        r.raftLog.commitTo(m.Commit)
        //给 Leader 发心跳Feeback
        r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
    }
    

    相关文章

      网友评论

          本文标题:2.2 etcd源码笔记 - raft library - 流程

          本文链接:https://www.haomeiwen.com/subject/ksgodctx.html