一、概要
这次将重点分析raft协议相关的核心源码
二、分析
上次看到api.go方法的最后会启动3个核心的线程。我们现在来分别看下这3个线程主要做了什么
// Start the background work.
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
1.run线程
// run is a long running goroutine that runs the Raft FSM.
func (r *Raft) run() {
for {
// Check if we are doing a shutdown
select {
case <-r.shutdownCh:
// Clear the leader to prevent forwarding
r.setLeader("")
return
default:
}
// Enter into a sub-FSM
switch r.getState() {
case Follower:
r.runFollower()
case Candidate:
r.runCandidate()
case Leader:
r.runLeader()
}
}
}
根据不同的角色分别执行各自的逻辑
// Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
// Check if we have had a successful contact
lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
continue
}
// Heartbeat failed! Transition to the candidate state
lastLeader := r.Leader()
r.setLeader("")
心跳监测,根据HeartbeatTimeout的配置随机(默认1s),随机时间为1~2倍的HeartbeatTimeout。判断上次心跳距离当前时间是否大于这个时间。超过则认为leader结点已不存在,并且转换成候选人的角色。
2.rpc请求处理
// processRPC is called to handle an incoming RPC request. This must only be
// called from the main thread.
func (r *Raft) processRPC(rpc RPC) {
if err := r.checkRPCHeader(rpc); err != nil {
rpc.Respond(nil, err)
return
}
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
case *RequestVoteRequest:
r.requestVote(rpc, cmd)
case *InstallSnapshotRequest:
r.installSnapshot(rpc, cmd)
default:
r.logger.Printf("[ERR] raft: Got unexpected command: %#v", rpc.Command)
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
AppendEntriesRequest:
- 写日志请求处理逻辑实现
- 若请求的term>当前的term,则以对方的数据为准,会把自己的角色转换成follower。
- 比较请求的数据和自己的数据,得到需要写入的新的log,并持久化到硬盘
RequestVoteRequest: - 选举逻辑实现
- 若选择的候选人和自己的leader不一致,则以自己的leader为准
- 若请求的term比自己的term小,忽略
- 查看自己是否已经在这个term有投票了,有的话直接返回上次的投票结果
- 持久化自己的投票结果,并返回给对方
网友评论