手撸golang 学etcd 手写raft协议之6 事件驱动+读写分离
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了四个子问题:
1. 领袖选举(leader election)、
2. 日志复制(log replication)、
3. 安全性(safety)
4. 成员关系变化(membership changes)
这几个子问题。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 6)
- 大幅重构,提升代码的可理解/可管理性:
- 基于事件驱动的逻辑编排,重构Follower和Candidate状态下的实现
- 将字段状态的管理,实行读写分离。没看错,代码也是可以"读写分离"的 _
设计
- random:为各种超时时间添加随机性
- tFollowerState:基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理
- tCandidateState:基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理
random.go
为各种超时时间添加随机性
package lsm
import (
"math/rand"
"time"
)
// fnRandomizeInt64 returns int64 value from v to v*1.3
func fnRandomizeInt64(v int64) int64 {
return v + v * gRand.Int63n(30) / 100
}
// fnRandomizeDuration returns duration value from v to v*1.3
func fnRandomizeDuration(v time.Duration) time.Duration {
i := int64(v)
return time.Duration(fnRandomizeInt64(i))
}
var gRand = rand.New(rand.NewSource(time.Now().UnixNano()))
tFollowerState.go
基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tFollowerState presents a follower node
type tFollowerState struct {
tEventDrivenModel
context IRaftLSM
mInitOnce sync.Once
mStartOnce sync.Once
// update: feInit / feLeaderHeartbeat
mTerm int64
// update: feInit / feLeaderHeartbeat
mLeaderHeartbeatTimestamp int64
// update: feLeaderHeartbeat
mLeaderID string
// update: feCandidateRequestVote / feVoteToCandidate
mLastVotedTerm int64
// update: feCandidateRequestVote / feVoteToCandidate
mLastVotedCandidateID string
// update: feCandidateRequestVote / feVoteToCandidate
mLastVotedTimestamp int64
// update: feInit / feDisposing
mDiseposedFlag bool
}
// trigger: init()
// args: empty
const feInit = "follower.init"
// trigger: Start()
// args: empty
const feStart = "follower.Start"
// trigger: Heartbeat()
// args: rpc.HeartbeatCmd
const feLeaderHeartbeat = "follower.LeaderHeartbeat"
// trigger: whenStartThenBeginWatchLeaderTimeout()
// args: empty
const feLeaderHeartbeatTimeout = "follower.LeaderHeartbeatTimeout"
// trigger: RequestVote()
// args: rpc.RequestVoteCmd
const feCandidateRequestVote = "candidate.RequestVote"
// trigger: RequestVote()
// args: rpc.RequestVoteCmd
const feVoteToCandidate = "follower.CandidateRequestVote"
// trigger: whenLeaderHeartbeatTimeoutThenSwitchToCandidateState
const feDisposing = "follower.Disposing"
func newFollowerState(ctx IRaftLSM) IRaftState {
it := new(tFollowerState)
it.init(ctx)
return it
}
func (me *tFollowerState) init(ctx IRaftLSM) {
me.mInitOnce.Do(func() {
me.context = ctx
me.initEventHandlers()
})
}
func (me *tFollowerState) initEventHandlers() {
// write only logic
me.hookEventsForTerm()
me.hookEventsForLeaderHeartbeatTimestamp()
me.hookEventsForLeaderID()
me.hookEventsForLastVotedTerm()
me.hookEventsForLastVotedCandicateID()
me.hookEventsForLastVotedTimestamp()
me.hookEventsForDisposedFlag()
// read only logic
me.hook(feStart,
me.whenStartThenBeginWatchLeaderTimeout)
me.hook(feLeaderHeartbeatTimeout,
me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
}
// hookEventsForTerm maintains field: mTerm
// update : feInit / feLeaderHeartbeat
func (me *tFollowerState) hookEventsForTerm() {
me.hook(feInit, func(e string, args ...interface{}) {
me.mTerm = me.context.store().LastCommittedTerm()
})
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
cmd := args[0].(*rpc.HeartbeatCmd)
me.mTerm = cmd.Term
})
}
// hookEventsForLeaderHeartbeatClock maintains field: mLeaderHeartbeatClock
// update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
func (me *tFollowerState) hookEventsForLeaderHeartbeatTimestamp() {
me.hook(feInit, func(e string, args ...interface{}) {
me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()
})
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()
})
me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {
me.mLeaderHeartbeatTimestamp = 0
})
}
// hookEventsForLeaderID maintains field: mLeaderID
// update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
func (me *tFollowerState) hookEventsForLeaderID() {
me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
cmd := args[0].(*rpc.HeartbeatCmd)
me.mLeaderID = cmd.LeaderID
})
me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {
me.mLeaderID = ""
})
}
// hookEventsForLastVotedTerm maintains field: mLastVotedTerm
// update : feCandidateRequestVote / feVoteToCandidate
func (me *tFollowerState) hookEventsForLastVotedTerm() {
me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
// before voting, check whether last vote timeout
now := time.Now().UnixNano()
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
// timeout, reset to empty
me.mLastVotedTerm = 0
me.mLastVotedCandidateID = ""
me.mLastVotedTimestamp = 0
}
})
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
cmd := args[0].(*rpc.RequestVoteCmd)
me.mLastVotedTerm = cmd.Term
})
}
// hookEventsForLastVotedCandicateID maintains field: mLastVotedCandidateID
// update : feCandidateRequestVote / feVoteToCandidate
func (me *tFollowerState) hookEventsForLastVotedCandicateID() {
me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
// before voting, check whether last vote timeout
now := time.Now().UnixNano()
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
// timeout, reset to empty
me.mLastVotedTerm = 0
me.mLastVotedCandidateID = ""
me.mLastVotedTimestamp = 0
}
})
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
cmd := args[0].(*rpc.RequestVoteCmd)
me.mLastVotedCandidateID = cmd.CandidateID
})
}
// hookEventsForLastVotedTimestamp maintains field: mLastVotedTimestamp
// update : feCandidateRequestVote / feVoteToCandidate
func (me *tFollowerState) hookEventsForLastVotedTimestamp() {
me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
// before voting, check whether last vote timeout
now := time.Now().UnixNano()
if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
// timeout, reset to empty
me.mLastVotedTerm = 0
me.mLastVotedCandidateID = ""
me.mLastVotedTimestamp = 0
}
})
me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
me.mLastVotedTimestamp = time.Now().UnixNano()
})
}
// hookEventsForDisposedFlag maintains field: mDisposedFlag
// update: feInit / feDisposing
func (me *tFollowerState) hookEventsForDisposedFlag() {
me.hook(feInit, func(e string, args ...interface{}) {
me.mDiseposedFlag = false
})
me.hook(feDisposing, func(e string, args ...interface{}) {
me.mDiseposedFlag = true
})
}
func (me *tFollowerState) Start() {
me.mStartOnce.Do(func() {
me.raise(feStart)
})
}
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {
go func() {
iCheckingTimeoutInterval := fnRandomizeDuration(timeout.HeartbeatTimeout / 3)
for range time.Tick(iCheckingTimeoutInterval) {
if me.mDiseposedFlag {
return
}
now := time.Now().UnixNano()
iHeartbeatTimeoutNanos := fnRandomizeInt64(int64(timeout.HeartbeatTimeout / time.Nanosecond))
if now - me.mLeaderHeartbeatTimestamp >= iHeartbeatTimeoutNanos {
me.raise(feLeaderHeartbeatTimeout)
return
}
}
}()
}
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {
me.raise(feDisposing)
me.context.handleStateChanged(newCandidateState(me.context, me.mTerm + 1))
}
func (me *tFollowerState) Role() roles.RaftRole {
return roles.Follower
}
// Heartbeat leader to follower
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.HBTermMismatch
ret.Term = me.mTerm
return nil
}
// raise LeaderHeartbeat
me.raise(feLeaderHeartbeat, cmd)
// return
ret.Code = rpc.HBOk
return nil
}
// AppendLog leader to follower
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
ret.Term = me.mTerm
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.ALTermMismatch
return nil
}
store := me.context.store()
entry := cmd.Entry
// check log: expecting appending action follows previous committing action
if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
// check log
e, log := store.GetLog(entry.Index)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
}
if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
// bad log
ret.Code = rpc.ALIndexMismatch
ret.PrevLogIndex = store.LastCommittedIndex()
ret.PrevLogTerm = store.LastCommittedTerm()
return nil
}
// good log, but old, just ignore it
ret.Code = rpc.ALOk
return nil
}
// good log
e := store.Append(entry)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
} else {
ret.Code = rpc.ALOk
return nil
}
}
// CommitLog leader to follower
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
store := me.context.store()
if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
// bad index
ret.Code = rpc.CLLogNotFound
return nil
}
e := store.Commit(cmd.Index)
if e != nil {
ret.Code = rpc.CLInternalError
return nil
}
ret.Code = rpc.CLOk
return nil
}
// RequestVote candidate to follower
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
// before voting
me.raise(feCandidateRequestVote, cmd)
// check term
if cmd.Term <= me.mTerm {
ret.Term = me.mTerm
ret.Code = rpc.RVTermMismatch
return nil
}
// check if already voted another
if me.mLastVotedTerm >= cmd.Term && me.mLastVotedCandidateID != "" && me.mLastVotedCandidateID != cmd.CandidateID {
ret.Code = rpc.RVVotedAnother
return nil
}
// check log index
if cmd.LastLogIndex < me.context.store().LastCommittedIndex() {
ret.Code = rpc.RVLogMismatch
return nil
}
// vote ok
me.raise(feVoteToCandidate, cmd)
ret.Term = cmd.Term
ret.Code = rpc.RVOk
return nil
}
tCandidateState.go
基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"sync"
"time"
)
// tCandidateState presents a candidate node
type tCandidateState struct {
tEventDrivenModel
context IRaftLSM
mInitOnce sync.Once
mStartOnce sync.Once
// update: init / ceElectionTimeout
mTerm int64
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
mVotedTerm int64
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
mVotedCandidateID string
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
mVotedTimestamp int64
}
// trigger: init()
// args: empty
const ceInit = "candidate.init"
// trigger: Start()
// args: empty
const ceStart = "candidate.Start"
// trigger: whenStartThenWatchElectionTimeout()
// args: empty
const ceElectionTimeout = "candidate.ElectionTimeout"
// trigger: Heartbeat() / AppendLog() / CommitLog()
// args: empty
const ceLeaderAnnounced = "candidate.LeaderAnnounced"
// trigger: RequestVote()
// args: *rpc.RequestVoteCmd
const ceVoteToCandidate = "candidate.VoteToCandidate"
// trigger: whenLeaderHeartbeatThenSwitchToFollower()
// args: empty
const ceDisposing = "candidate.Disposing"
func newCandidateState(ctx IRaftLSM, term int64) IRaftState {
it := new(tCandidateState)
it.init(ctx, term)
return it
}
func (me *tCandidateState) init(ctx IRaftLSM, term int64) {
me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = term
me.initEventHandlers()
me.raise(ceInit)
})
}
func (me *tCandidateState) initEventHandlers() {
// write only logic
me.hookEventsForTerm()
me.hookEventsForVotedTerm()
me.hookEventsForVotedCandidateID()
me.hookEventsForVotedTimestamp()
// read only logic
me.hook(ceLeaderAnnounced,
me.whenLeaderAnnouncedThenSwitchToFollower)
me.hook(ceElectionTimeout,
me.whenElectionTimeoutThenRequestVoteAgain)
}
// hookEventsForTerm maintains field: mTerm
// update: ceElectionTimeout
func (me *tCandidateState) hookEventsForTerm() {
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when election timeout, term++ and request vote again
me.mTerm++
})
}
// hookEventsForVotedTerm maintains field: mVotedTerm
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedTerm() {
me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedTerm = me.mTerm
})
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedTerm = me.mTerm
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedTerm = cmd.Term
})
}
// hookEventsForVotedCandidateID maintains field: mVotedCandidateID
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedCandidateID() {
me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedCandidateID = me.context.config().ID()
})
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedCandidateID = me.context.config().ID()
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedCandidateID = cmd.CandidateID
})
}
func (me *tCandidateState) hookEventsForVotedTimestamp() {
me.hook(ceInit, func(e string, args ...interface{}) {
// initially, vote to itself
me.mVotedTimestamp = time.Now().UnixNano()
})
me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
// when timeout, reset to itself
me.mVotedTimestamp = time.Now().UnixNano()
})
me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
me.mVotedTimestamp = time.Now().UnixNano()
})
}
func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term <= me.mTerm {
// bad leader
ret.Code = rpc.HBTermMismatch
return nil
}
// new leader
me.raise(ceLeaderAnnounced)
// return ok
ret.Code = rpc.HBOk
return nil
}
func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
// check term
if cmd.Term <= me.mTerm {
// bad leader
ret.Code = rpc.ALTermMismatch
return nil
}
// new leader
me.raise(ceLeaderAnnounced)
// ignore and return
ret.Code = rpc.ALInternalError
return nil
}
func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
// ignore and return
ret.Code = rpc.CLInternalError
return nil
}
func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
// todo: fixme
panic("implements me")
}
func (me *tCandidateState) Role() roles.RaftRole {
return roles.Candidate
}
func (me *tCandidateState) Start() {
me.mStartOnce.Do(func() {
me.raise(feStart)
})
}
func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
me.raise(ceDisposing)
me.context.handleStateChanged(newFollowerState(me.context))
}
func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ string, _ ...interface{}) {
// todo: fixme
panic("implements me")
}
(未完待续)
网友评论