美文网首页
raft mit 6.824 实现领导者选举(LAB2A)

raft mit 6.824 实现领导者选举(LAB2A)

作者: 东京的雨不会淋湿首尔 | 来源:发表于2020-08-26 16:56 被阅读0次

    1.程序结构

    lab2的实验是要实现以下接口

    // create a new Raft server instance:
    rf := Make(peers, me, persister, applyCh)
    
    // start agreement on a new log entry:
    rf.Start(command interface{}) (index, term, isleader)
    
    // ask a Raft for its current term, and whether it thinks it is leader
    rf.GetState() (term, isLeader)
    
    // each time a new entry is committed to the log, each Raft peer
    // should send an ApplyMsg to the service (or tester).
    type ApplyMsg
    

    其中参数在注释中基本都有解释。
    Make 用来创建点对点server ,peers是所有的server,len(peers)也就是所有的server数量,me就是当前的server,也就是在peers中的下标。persister和applyCh 以及Start函数在2A实验中没有用上。

    做实验前强烈建议多读几遍raft论文。
    助教的student guide 也很重要:

    student guide

    FAQ

    locking and structure

    2.常见问题

    1. 什么时候需要重置超时选举时间?在 studen guide中写到有3种情况需要重置超时时间:
      a) 从当前的leader 中收到 AppendEntries ,收到过期的不需要重置
      b) 开始一轮选举的时候,当然本轮没有选举结果开始下一轮的时候也应该重置
      c) 当你给其他节点投票的时候

    2. 什么时候重置votefFor?
      当term改变的时候重置votedfor,例如投票的时候收到了term更高,那么不管此刻是否投票过,都应该转变为follower并且重置votedfor;当leader掉线一段时间重连回来,此刻群组有新leader,那么old leader应该转变为follower并且重置votedfor。

    3. 在做RPC请求的时候不应该加锁。例如:
      rf.mu.Lock()
      rf. sendRequestVote(...)
      rf.mu.UnLock()
      这种写法是不合逻辑的,容易造成死锁,我们仅需对共享资源加锁就足够了,比如这里对参数构造加锁。

    4. server 成为leader之后不需要进行 选举超时倒计时。

    3.实现 给出部分代码,仅供参考

    3.1 流程

    1. 首先 raft 启动,在Make 中做初始化,一开始所有的server都是follower,并且每个server会随机分配一个选举超时时间,同时所有的server都有相同的心跳间隔时间。那么由于涉及到角色转换,我们最好为每一种角色写一个转换函数,convertToFollower,convertToCandidate,convertToLeader。
    • 什么时候会转变到follower呢?不外乎在心跳,投票或者appendEntries的时候收到了更加新的server的信息,这时候需要转变到follower的同时更新自己的Term
    • 转变到candidate只有一种,当选举超时内没有收到心跳或者appendEntries,那么超时之后主动变成candidate
    • 转变到leader,只有在candidate收到大多数请求的时候,竞选成功,成为leader
    func (rf* Raft) convertToCandidate(){
        rf.raftStatus = Candidate
        rf.currentTerm++
        rf.votedFor = rf.me
        rf.nonLeaderCond.Signal() // awake electionTimeoutTick
    
    }
    
    func (rf* Raft) convertToFollower(newTerm int){
        rf.raftStatus = Follower
        rf.currentTerm = newTerm
        rf.votedFor = -1
        rf.nonLeaderCond.Signal() // awake electionTimeoutTick
    
    }
    
    func (rf* Raft) convertToLeader(){
        rf.raftStatus = Leader
    
    }
    

    初始化的时候,为了防止瓜分选票的情况发生,需要随机设置不同的超时时间,论文建议是150-300ms ,建议写个函数,用时间戳做随机种子,论文建议每次生成不同的超时时间。

    func (rf *Raft) getRandElecTime() int {
        rand.Seed(time.Now().UnixNano())
        electionTimeout := rand.Intn(150) + 150 // [150,300)
        return electionTimeout
    }
    func (rf* Raft) resetElectionTimer()  {
        rf.lastHeartbeatTime = time.Now().UnixNano()
        rf.electionTimeout = rf.getRandElecTime()
    }
    

    根据论文解析,我们需要长时间运行的gorutinue (可以简单理解为死循环)来处理事件和统计选举超时。

    func (rf *Raft) EventLoop() () {
        for{
            rf.mu.Lock()
            if rf.killed(){
                rf.mu.Unlock()
                return
            }
            rf.mu.Unlock()
            select {
            case <- rf.electionTimeoutChan:
                rf.mu.Lock()
                DPrintf("[EventLoop]: Id %d Term %d State %s \t || \t election timeout , start an eletion\n",
                    rf.me, rf.currentTerm, state2name(rf.raftStatus))
                rf.mu.Unlock()
                // start election, if election timeout
                go rf.startElection()
    
            case <-rf.heartbeatPeriodChan:
                DPrintf("[EventLoop]: Id %d Term %d State %s \t || \t election timeout , start to send heartbeat\n",
                    rf.me, rf.currentTerm, state2name(rf.raftStatus))
                go rf.broadcastHeartbeat()
            }
        }
    }
    
    func (rf *Raft) electionTimeoutTick()  {
        for {
            rf.mu.Lock()
            _, isLeader := rf.GetState()
            if rf.killed(){
                rf.mu.Unlock()
                return
            }
            rf.mu.Unlock()
            if isLeader {
                // if is leader , no need to check election timeout
                rf.nonLeaderCond.L.Lock()
                rf.nonLeaderCond.Wait()
                rf.nonLeaderCond.L.Unlock()
            }else { // follower and candidate
                rf.mu.Lock()
                elapseTime := time.Now().UnixNano() - rf.lastHeartbeatTime
                if elapseTime/int64(time.Millisecond) > int64(rf.electionTimeout){
                    DPrintf("[electionTimeoutTick]: Id %d Term %d State %s\t || \ttimeout,"+
                        "convert to candidate\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus))
                    DPrintf("[electionTimeoutTick] : %d %d\n",elapseTime/int64(time.Millisecond),int64(rf.electionTimeout))
                    rf.electionTimeoutChan <- true
                }
                rf.mu.Unlock()
                time.Sleep(time.Millisecond*10)
            }
        }
    }
    

    实验部分要求心跳不超过10次/s,也就是不小于100ms/次,而心跳时间间隔应该小于选举超时时间,因此设置为100到150ms之间。

    func (rf* Raft) broadcastHeartbeat()  {
        for {
            rf.mu.Lock()
            _,isLeader := rf.GetState()
            if rf.killed(){
                rf.mu.Unlock()
                return
            }
            rf.mu.Unlock()
    
            if !isLeader {
                // not leader , then return
                return
            }
    
            // send heart beat
            for i,_ := range rf.peers{
                if i == rf.me{
                    continue
                }
                rf.mu.Lock()
                //prevLogIndex := len(rf.log)-1
                prevLogIndex,PrevLogTerm :=rf.getLastLogInfo()
                args := AppendEntriesArgs{Term: rf.currentTerm,LeaderId: rf.me,PrevLogIndex: prevLogIndex,
                    PrevLogTerm: PrevLogTerm, LeaderCommit: rf.commitIndex}
                var reply AppendEntriesReply
                rf.mu.Unlock()
                go func(index int, args *AppendEntriesArgs, reply* AppendEntriesReply) {
                    ok := rf.sendAppendEntries(index,args,reply)
                    rf.mu.Lock()
                    defer rf.mu.Unlock()
                    if ok == false{
                        // send heartbeat failed
                        //DPrintf("[broadcastHeartbeat]: %d send to peer's id %d failed term %d\n",args.LeaderId,index,reply.Term)
    
                    }else {
                        //DPrintf("[broadcastHeartbeat]: Id %d Term %d State %s\t || \ttimeout,"+
                        //  "send heartbeat to %d success\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus),index)
    
                        if reply.Term > rf.currentTerm{
                            //
                            DPrintf("[broadcastHeartbeat]: Id %d Term %d State %s\t || \ttimeout,"+
                                "send heartbeat failed, reply term %d\n" , rf.me, rf.currentTerm ,state2name(rf.raftStatus),reply.Term)
                            rf.convertToFollower(reply.Term)
    
                        }else if reply.Term == rf.currentTerm && reply.Success == false{
                            // follower's log index and log term not match
    
                        }else {
                            //
                        }
    
                    }
                    // check reply
    
    
                }(i,&args,&reply)
            }
    
            // sleep
            time.Sleep(time.Duration(rf.heartbeatInterval)*time.Millisecond)
        }
    
    }
    
    1. 开始选举。当某一台server 的选举超时计时先倒数完,这台server按照规则,先给变成candidate,term加一,给自己投票,重置选举超时计时器,然后让其他server投票。只要超过半数,那么就当选leader,开始给其余节点发心跳。
    func (rf* Raft) startElection()  {
        rf.mu.Lock()
        rf.convertToCandidate()
        nVotes := 1 // has voted in the term
        // 3.reset election timeout
        rf.resetElectionTimer()
        DPrintf("[startElection]: Id %d Term %d state %s \n",rf.me, rf.currentTerm, state2name(rf.raftStatus))
        rf.mu.Unlock()
        // 4.send request vote to other server
        go func(nVotes* int ,rf* Raft) {
            var wg sync.WaitGroup
            winThreadHold := len(rf.peers)/2 + 1
    
            for i,_ := range rf.peers{
                if i == rf.me{
                    continue
                }
                rf.mu.Lock()
                lastLogIndex ,LastLogTerm := rf.getLastLogInfo()
    
                reqArgs := RequestVoteArgs{Term: rf.currentTerm,CandidateId: rf.me, LastLogIndex: lastLogIndex,LastLogTerm: LastLogTerm}
                rf.mu.Unlock()
    
                wg.Add(1)
    
                var reply RequestVoteReply
                go func(index int, rf* Raft, args* RequestVoteArgs, reply* RequestVoteReply) {
                    defer wg.Done()
                    DPrintf("[startElection]: Id %d Term %d state %s \t || \t" +
                        "start send request vote to %d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),index)
    
                    ok := rf.sendRequestVote(index,args,reply)
                    if ok == false {
                        DPrintf("[startElection]: Id %d Term %d state %s \t || \t" +
                            "send request vote to %d failed \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),index)
                        return
                    }
    
                    rf.mu.Lock()
                    defer rf.mu.Unlock()
                    // reject vote
                    if reply.VoteGranted == false{
                        if reply.Term > rf.currentTerm{
                            DPrintf("[startElection]: Id %d Term %d state %s \t || \t" +
                                "peer term:%d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),reply.Term)
                            rf.convertToFollower(reply.Term)
                        }
                    }else{
                        *nVotes += 1
                        // if it self has became leader , then no need to do
                        _,isLeader := rf.GetState()
                        if isLeader{
                            return
                        }
                        if rf.raftStatus == Candidate && *nVotes >= winThreadHold {
                            DPrintf("[startElection]: Id %d Term %d state %s \t || \t" +
                                "win election votes:%d \n",rf.me, rf.currentTerm, state2name(rf.raftStatus),*nVotes)
    
                            // win election
                            rf.convertToLeader()
    
                            // re init nextIndex
                            rf.reInitNextIndex()
    
                            // send heartbeat immediately to all server
                            DPrintf("start send heartbeat\n")
                            go rf.broadcastHeartbeat()
                        }
                    }
    
                }(i,rf,&reqArgs,&reply)
            }
    
            // wait all send finish
            wg.Wait()
    
        }(&nVotes,rf)
    
    }
    
    1. 处理投票
      那么这里主要是根据term和lastlogindex来判断是否投票。
      不投票?
      a) 比自己Term小
      b) log比自己旧
      c) 在本次Term中已经投票过了(args.Term == rf.currentTerm && rf.votedFor!= -1)

    2. 处理心跳
      论文说了,entries为空表示心跳信息。
      如果收到了term更高的发来的心跳,应该转变为follower并重置选举超时计时器
      收到当前term的,直接重置选举超时计时器
      同时我们应该在reply中附上自己的term,以便leader检测自己是否是过时的leader(leader断线重连,term比自己小)

    本篇文章由一文多发平台ArtiPub自动发布

    相关文章

      网友评论

          本文标题:raft mit 6.824 实现领导者选举(LAB2A)

          本文链接:https://www.haomeiwen.com/subject/uelfsktx.html