手撸golang etcd raft协议之3
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了领袖选举(leader election)、
日志复制(log replication)、安全性(safety)
和成员关系变化(membership changes)这几个子问题。
Raft算法的基本操作只需2种RPC即可完成。
RequestVote RPC是在选举过程中通过旧的Leader触发的,
AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 3)
- 继续完善raft状态机之Follower状态的处理逻辑
- 继续完善raft状态机之Candidate状态的处理逻辑
设计
- tFollowerState:
- 监视Leader心跳是否超时
- 如果Leader心跳超时,则切换到Candidate状态,竞选新leader
- 添加RequestVote和AppendEntries两个RPC接口的响应
- tCandidateState:
- 进入此状态,立即向其他节点发起竞选请求
- 如竞选超时,则重新发起竞选
- 如收到新Leader心跳,则切换回Follower
- 如收到N/2+1张票,则切换到Leader,并广播之
tFollowerState.go
继续完善raft状态机之Follower状态的处理逻辑
package lsm
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
type tFollowerState struct {
tRaftStateBase
mInitOnce sync.Once
mStartOnce sync.Once
mVotedLeaderID string
mLeaderHeartbeatClock int64
mStateChangedHandler StateChangedHandleFunc
mEventMap map[tFollowerEvent][]tFollowerEventHandler
}
type JobFunc func()
type tFollowerEvent int
const (
evFollowerStart tFollowerEvent = iota
evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota
)
type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})
func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState {
it := new(tFollowerState)
it.init(term, cfg, handler)
return it
}
func (me *tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) {
me.mInitOnce.Do(func() {
me.tRaftStateBase = *newRaftStateBase(term, cfg)
me.role = roles.Follower
me.mStateChangedHandler = handler
// init event map
me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
me.registerEventHandlers()
})
}
func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {
if handlers, ok := me.mEventMap[e]; ok {
for _, it := range handlers {
it(e, args...)
}
}
}
func (me *tFollowerState) registerEventHandlers() {
me.mEventMap[evFollowerStart] = []tFollowerEventHandler{
me.whenStartThenBeginWatchLeaderTimeout,
}
me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler {
me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState,
}
}
func (me *tFollowerState) Start() {
me.mStartOnce.Do(func() {
me.raise(evFollowerStart)
})
}
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args... interface{}) {
go func() {
iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
for range time.Tick(iCheckingTimeoutInterval) {
now := time.Now().UnixNano()
if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {
me.raise(evFollowerLeaderHeartbeatTimeout)
return
}
}
}()
}
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args... interface{}) {
fn := me.mStateChangedHandler
if fn == nil {
return
}
state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler)
fn(state)
}
func (me *tFollowerState) Role() roles.RaftRole {
return roles.Follower
}
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
if cmd.Term <= me.term {
ret.Term = me.term
ret.VoteGranted = false
return nil
}
if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID {
ret.Term = me.term
ret.VoteGranted = false
return nil
}
me.mVotedLeaderID = cmd.CandidateID
ret.Term = cmd.Term
ret.VoteGranted = true
return nil
}
func (me *tFollowerState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {
if cmd.Term < me.term {
ret.Term = me.term
ret.Success = false
return nil
}
me.term = cmd.Term
me.leaderID = cmd.LeaderID
me.mLeaderHeartbeatClock = time.Now().UnixNano()
if len(cmd.Entries) <= 0 {
// just heartbeat package
ret.Term = cmd.Term
ret.Success = true
return nil
}
// todo: append logs
return nil
}
func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) {
me.mStateChangedHandler = handler
}
tCandidateState.go
继续完善raft状态机之Candidate状态的处理逻辑
package lsm
import (
"errors"
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/rpc"
"sync"
)
type tCandidateState struct {
tRaftStateBase
mInitOnce sync.Once
mStartOnce sync.Once
mStateChangedHandler StateChangedHandleFunc
mEventMap map[tCandidateEvent][]tCandidateEventHandler
}
func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
return gErrorCandidateWontReplyRequestVote
}
func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {
return gErrorCandidateWontReplyAppendEntries
}
func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) {
me.mStateChangedHandler = handler
}
type tCandidateEvent int
const (
evCandidateStart tCandidateEvent = iota
evCandidateElectionTimeout tCandidateEvent = iota
evCandidateGotEnoughVotes tCandidateEvent = iota
)
type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})
func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState {
it := new(tCandidateState)
it.init(cfg, term, handler)
return it
}
func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) {
me.mInitOnce.Do(func() {
me.cfg = cfg
me.term = term
me.mStateChangedHandler = handler
// init event map
me.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler)
me.registerEventHandlers()
})
}
func (me *tCandidateState) registerEventHandlers() {
me.mEventMap[evCandidateStart] = []tCandidateEventHandler{
me.whenStartThenRequestVote,
me.whenStartThenWatchElectionTimeout,
}
me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{
me.whenElectionTimeoutThenRequestVoteAgain,
}
me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{
me.whenGotEnoughVotesThenSwitchToLeader,
}
}
func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) {
if handlers, ok := me.mEventMap[e]; ok {
for _, it := range handlers {
it(e, args...)
}
}
}
func (me *tCandidateState) Start() {
me.mStartOnce.Do(func() {
me.raise(evCandidateStart)
})
}
func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) {
// todo: fixme
panic("implements me")
}
func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) {
// todo: fixme
panic("implements me")
}
func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) {
// todo: fixme
panic("implements me")
}
func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) {
// todo: fixme
panic("implements me")
}
var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")
var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")
(未完待续)
网友评论