美文网首页
consul-rarf协议核心源码

consul-rarf协议核心源码

作者: leiwingqueen | 来源:发表于2018-11-28 00:15 被阅读0次

    一、概要

    这次将重点分析raft协议相关的核心源码

    二、分析

    上次看到api.go方法的最后会启动3个核心的线程。我们现在来分别看下这3个线程主要做了什么

    // Start the background work.
        r.goFunc(r.run)
        r.goFunc(r.runFSM)
        r.goFunc(r.runSnapshots)
        return r, nil
    
    1.run线程
    // run is a long running goroutine that runs the Raft FSM.
    func (r *Raft) run() {
        for {
            // Check if we are doing a shutdown
            select {
            case <-r.shutdownCh:
                // Clear the leader to prevent forwarding
                r.setLeader("")
                return
            default:
            }
    
            // Enter into a sub-FSM
            switch r.getState() {
            case Follower:
                r.runFollower()
            case Candidate:
                r.runCandidate()
            case Leader:
                r.runLeader()
            }
        }
    }
    

    根据不同的角色分别执行各自的逻辑

    // Restart the heartbeat timer
                heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
    
                // Check if we have had a successful contact
                lastContact := r.LastContact()
                if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
                    continue
                }
    
                // Heartbeat failed! Transition to the candidate state
                lastLeader := r.Leader()
                r.setLeader("")
    

    心跳监测,根据HeartbeatTimeout的配置随机(默认1s),随机时间为1~2倍的HeartbeatTimeout。判断上次心跳距离当前时间是否大于这个时间。超过则认为leader结点已不存在,并且转换成候选人的角色。

    2.rpc请求处理
    // processRPC is called to handle an incoming RPC request. This must only be
    // called from the main thread.
    func (r *Raft) processRPC(rpc RPC) {
        if err := r.checkRPCHeader(rpc); err != nil {
            rpc.Respond(nil, err)
            return
        }
    
        switch cmd := rpc.Command.(type) {
        case *AppendEntriesRequest:
            r.appendEntries(rpc, cmd)
        case *RequestVoteRequest:
            r.requestVote(rpc, cmd)
        case *InstallSnapshotRequest:
            r.installSnapshot(rpc, cmd)
        default:
            r.logger.Printf("[ERR] raft: Got unexpected command: %#v", rpc.Command)
            rpc.Respond(nil, fmt.Errorf("unexpected command"))
        }
    }
    

    AppendEntriesRequest:

    • 写日志请求处理逻辑实现
    • 若请求的term>当前的term,则以对方的数据为准,会把自己的角色转换成follower。
    • 比较请求的数据和自己的数据,得到需要写入的新的log,并持久化到硬盘
      RequestVoteRequest:
    • 选举逻辑实现
    • 若选择的候选人和自己的leader不一致,则以自己的leader为准
    • 若请求的term比自己的term小,忽略
    • 查看自己是否已经在这个term有投票了,有的话直接返回上次的投票结果
    • 持久化自己的投票结果,并返回给对方

    相关文章

      网友评论

          本文标题:consul-rarf协议核心源码

          本文链接:https://www.haomeiwen.com/subject/qhhwqqtx.html