美文网首页GO与微服务
手撸golang 学etcd 手写raft协议之6 事件驱动+读

手撸golang 学etcd 手写raft协议之6 事件驱动+读

作者: 老罗话编程 | 来源:发表于2021-04-01 17:13 被阅读0次

    手撸golang 学etcd 手写raft协议之6 事件驱动+读写分离

    缘起

    最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
    本系列笔记拟采用golang练习之
    gitee: https://gitee.com/ioly/learning.gooop

    raft分布式一致性算法

    分布式存储系统通常会通过维护多个副本来进行容错,
    以提高系统的可用性。
    这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
    
    Raft算法把问题分解成了四个子问题:
    1. 领袖选举(leader election)、
    2. 日志复制(log replication)、
    3. 安全性(safety)
    4. 成员关系变化(membership changes)
    这几个子问题。
    

    目标

    • 根据raft协议,实现高可用分布式强一致的kv存储

    子目标(Day 6)

    • 大幅重构,提升代码的可理解/可管理性:
      • 基于事件驱动的逻辑编排,重构Follower和Candidate状态下的实现
      • 将字段状态的管理,实行读写分离。没看错,代码也是可以"读写分离"的 _

    设计

    • random:为各种超时时间添加随机性
    • tFollowerState:基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理
    • tCandidateState:基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理

    random.go

    为各种超时时间添加随机性

    package lsm
    
    import (
        "math/rand"
        "time"
    )
    
    // fnRandomizeInt64 returns int64 value from v to v*1.3
    func fnRandomizeInt64(v int64) int64 {
        return v + v * gRand.Int63n(30) / 100
    }
    
    // fnRandomizeDuration returns duration value from v to v*1.3
    func fnRandomizeDuration(v time.Duration) time.Duration {
        i := int64(v)
        return time.Duration(fnRandomizeInt64(i))
    }
    
    var gRand = rand.New(rand.NewSource(time.Now().UnixNano()))
    

    tFollowerState.go

    基于事件驱动重构Follower状态的逻辑编排,各字段实施读写分离管理

    package lsm
    
    import (
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "time"
    )
    
    // tFollowerState presents a follower node
    type tFollowerState struct {
        tEventDrivenModel
    
        context IRaftLSM
        mInitOnce    sync.Once
        mStartOnce   sync.Once
    
        // update: feInit / feLeaderHeartbeat
        mTerm int64
    
        // update: feInit / feLeaderHeartbeat
        mLeaderHeartbeatTimestamp int64
    
        // update: feLeaderHeartbeat
        mLeaderID string
    
        // update: feCandidateRequestVote / feVoteToCandidate
        mLastVotedTerm int64
    
        // update: feCandidateRequestVote / feVoteToCandidate
        mLastVotedCandidateID string
    
        // update: feCandidateRequestVote / feVoteToCandidate
        mLastVotedTimestamp int64
    
        // update: feInit / feDisposing
        mDiseposedFlag bool
    }
    
    // trigger: init()
    // args: empty
    const feInit = "follower.init"
    
    // trigger: Start()
    // args: empty
    const feStart = "follower.Start"
    
    // trigger: Heartbeat()
    // args: rpc.HeartbeatCmd
    const feLeaderHeartbeat = "follower.LeaderHeartbeat"
    
    // trigger: whenStartThenBeginWatchLeaderTimeout()
    // args: empty
    const feLeaderHeartbeatTimeout = "follower.LeaderHeartbeatTimeout"
    
    // trigger: RequestVote()
    // args: rpc.RequestVoteCmd
    const feCandidateRequestVote = "candidate.RequestVote"
    
    // trigger: RequestVote()
    // args: rpc.RequestVoteCmd
    const feVoteToCandidate = "follower.CandidateRequestVote"
    
    // trigger: whenLeaderHeartbeatTimeoutThenSwitchToCandidateState
    const feDisposing = "follower.Disposing"
    
    func newFollowerState(ctx IRaftLSM) IRaftState {
        it := new(tFollowerState)
        it.init(ctx)
        return it
    }
    
    func (me *tFollowerState) init(ctx IRaftLSM) {
        me.mInitOnce.Do(func() {
            me.context = ctx
            me.initEventHandlers()
        })
    }
    
    func (me *tFollowerState) initEventHandlers() {
        // write only logic
        me.hookEventsForTerm()
        me.hookEventsForLeaderHeartbeatTimestamp()
        me.hookEventsForLeaderID()
        me.hookEventsForLastVotedTerm()
        me.hookEventsForLastVotedCandicateID()
        me.hookEventsForLastVotedTimestamp()
        me.hookEventsForDisposedFlag()
    
        // read only logic
        me.hook(feStart,
            me.whenStartThenBeginWatchLeaderTimeout)
        me.hook(feLeaderHeartbeatTimeout,
            me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
    }
    
    // hookEventsForTerm maintains field: mTerm
    // update : feInit / feLeaderHeartbeat
    func (me *tFollowerState) hookEventsForTerm() {
        me.hook(feInit, func(e string, args ...interface{}) {
            me.mTerm = me.context.store().LastCommittedTerm()
        })
    
        me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
            cmd := args[0].(*rpc.HeartbeatCmd)
            me.mTerm = cmd.Term
        })
    }
    
    // hookEventsForLeaderHeartbeatClock maintains field: mLeaderHeartbeatClock
    // update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
    func (me *tFollowerState) hookEventsForLeaderHeartbeatTimestamp() {
        me.hook(feInit, func(e string, args ...interface{}) {
            me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()
        })
    
        me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
            me.mLeaderHeartbeatTimestamp = time.Now().UnixNano()
        })
    
        me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {
            me.mLeaderHeartbeatTimestamp = 0
        })
    }
    
    // hookEventsForLeaderID maintains field: mLeaderID
    // update : feLeaderHeartbeat / feLeaderHeartbeatTimeout
    func (me *tFollowerState) hookEventsForLeaderID() {
        me.hook(feLeaderHeartbeat, func(e string, args ...interface{}) {
            cmd := args[0].(*rpc.HeartbeatCmd)
            me.mLeaderID = cmd.LeaderID
        })
    
        me.hook(feLeaderHeartbeatTimeout, func(e string, args ...interface{}) {
            me.mLeaderID = ""
        })
    }
    
    
    // hookEventsForLastVotedTerm maintains field: mLastVotedTerm
    // update : feCandidateRequestVote / feVoteToCandidate
    func (me *tFollowerState) hookEventsForLastVotedTerm() {
        me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
            // before voting, check whether last vote timeout
            now := time.Now().UnixNano()
            if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
                // timeout, reset to empty
                me.mLastVotedTerm = 0
                me.mLastVotedCandidateID = ""
                me.mLastVotedTimestamp = 0
            }
        })
    
        me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mLastVotedTerm = cmd.Term
        })
    }
    
    // hookEventsForLastVotedCandicateID maintains field: mLastVotedCandidateID
    // update : feCandidateRequestVote / feVoteToCandidate
    func (me *tFollowerState) hookEventsForLastVotedCandicateID() {
        me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
            // before voting, check whether last vote timeout
            now := time.Now().UnixNano()
            if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
                // timeout, reset to empty
                me.mLastVotedTerm = 0
                me.mLastVotedCandidateID = ""
                me.mLastVotedTimestamp = 0
            }
        })
    
        me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mLastVotedCandidateID = cmd.CandidateID
        })
    }
    
    // hookEventsForLastVotedTimestamp maintains field: mLastVotedTimestamp
    // update : feCandidateRequestVote / feVoteToCandidate
    func (me *tFollowerState) hookEventsForLastVotedTimestamp() {
        me.hook(feCandidateRequestVote, func(e string, args ...interface{}) {
            // before voting, check whether last vote timeout
            now := time.Now().UnixNano()
            if time.Duration(now - me.mLastVotedTimestamp) * time.Nanosecond >= fnRandomizeDuration(timeout.ElectionTimeout) {
                // timeout, reset to empty
                me.mLastVotedTerm = 0
                me.mLastVotedCandidateID = ""
                me.mLastVotedTimestamp = 0
            }
        })
    
        me.hook(feVoteToCandidate, func(e string, args ...interface{}) {
            me.mLastVotedTimestamp = time.Now().UnixNano()
        })
    }
    
    // hookEventsForDisposedFlag maintains field: mDisposedFlag
    // update: feInit / feDisposing
    func (me *tFollowerState) hookEventsForDisposedFlag() {
        me.hook(feInit, func(e string, args ...interface{}) {
            me.mDiseposedFlag = false
        })
    
        me.hook(feDisposing, func(e string, args ...interface{}) {
            me.mDiseposedFlag = true
        })
    }
    
    
    func (me *tFollowerState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(feStart)
        })
    }
    
    func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {
        go func() {
            iCheckingTimeoutInterval := fnRandomizeDuration(timeout.HeartbeatTimeout / 3)
            for range time.Tick(iCheckingTimeoutInterval) {
                if me.mDiseposedFlag {
                    return
                }
    
                now := time.Now().UnixNano()
                iHeartbeatTimeoutNanos := fnRandomizeInt64(int64(timeout.HeartbeatTimeout / time.Nanosecond))
    
                if now - me.mLeaderHeartbeatTimestamp >= iHeartbeatTimeoutNanos {
                    me.raise(feLeaderHeartbeatTimeout)
                    return
                }
            }
        }()
    }
    
    func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {
        me.raise(feDisposing)
        me.context.handleStateChanged(newCandidateState(me.context, me.mTerm + 1))
    }
    
    func (me *tFollowerState) Role() roles.RaftRole {
        return roles.Follower
    }
    
    // Heartbeat leader to follower
    func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
        // check term
        if cmd.Term < me.mTerm {
            // invalid leader
            ret.Code = rpc.HBTermMismatch
            ret.Term = me.mTerm
            return nil
        }
    
        // raise LeaderHeartbeat
        me.raise(feLeaderHeartbeat, cmd)
    
        // return
        ret.Code = rpc.HBOk
        return nil
    }
    
    // AppendLog leader to follower
    func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
        ret.Term = me.mTerm
    
        if cmd.Term < me.mTerm {
            // invalid leader
            ret.Code = rpc.ALTermMismatch
            return nil
        }
    
        store := me.context.store()
        entry := cmd.Entry
    
        // check log: expecting appending action follows previous committing action
        if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
            // check log
            e, log := store.GetLog(entry.Index)
            if e != nil {
                ret.Code = rpc.ALInternalError
                return nil
            }
    
            if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
                // bad log
                ret.Code = rpc.ALIndexMismatch
                ret.PrevLogIndex = store.LastCommittedIndex()
                ret.PrevLogTerm = store.LastCommittedTerm()
                return nil
            }
    
            // good log, but old, just ignore it
            ret.Code = rpc.ALOk
            return nil
        }
    
        // good log
        e := store.Append(entry)
        if e != nil {
            ret.Code = rpc.ALInternalError
            return nil
        } else {
            ret.Code = rpc.ALOk
            return nil
        }
    }
    
    // CommitLog leader to follower
    func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
        store := me.context.store()
        if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
            // bad index
            ret.Code = rpc.CLLogNotFound
            return nil
        }
    
        e := store.Commit(cmd.Index)
        if e != nil {
            ret.Code = rpc.CLInternalError
            return nil
        }
    
        ret.Code = rpc.CLOk
        return nil
    }
    
    // RequestVote candidate to follower
    func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        // before voting
        me.raise(feCandidateRequestVote, cmd)
    
        // check term
        if cmd.Term <= me.mTerm {
            ret.Term = me.mTerm
            ret.Code = rpc.RVTermMismatch
            return nil
        }
    
        // check if already voted another
        if me.mLastVotedTerm >= cmd.Term && me.mLastVotedCandidateID != "" && me.mLastVotedCandidateID != cmd.CandidateID {
            ret.Code = rpc.RVVotedAnother
            return nil
        }
    
        // check log index
        if cmd.LastLogIndex < me.context.store().LastCommittedIndex() {
            ret.Code = rpc.RVLogMismatch
            return nil
        }
    
        // vote ok
        me.raise(feVoteToCandidate, cmd)
        ret.Term = cmd.Term
        ret.Code = rpc.RVOk
    
        return nil
    }
    
    

    tCandidateState.go

    基于事件驱动重构Candidate状态的逻辑编排,各字段实施读写分离管理

    package lsm
    
    import (
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
        "sync"
        "time"
    )
    
    // tCandidateState presents a candidate node
    type tCandidateState struct {
        tEventDrivenModel
    
        context IRaftLSM
        mInitOnce    sync.Once
        mStartOnce   sync.Once
    
        // update: init / ceElectionTimeout
        mTerm int64
    
        // update: ceInit / ceElectionTimeout / ceVoteToCandidate
        mVotedTerm int64
    
        // update: ceInit / ceElectionTimeout / ceVoteToCandidate
        mVotedCandidateID string
    
        // update: ceInit / ceElectionTimeout / ceVoteToCandidate
        mVotedTimestamp int64
    }
    
    // trigger: init()
    // args: empty
    const ceInit = "candidate.init"
    
    // trigger: Start()
    // args: empty
    const ceStart = "candidate.Start"
    
    // trigger: whenStartThenWatchElectionTimeout()
    // args: empty
    const ceElectionTimeout = "candidate.ElectionTimeout"
    
    // trigger: Heartbeat() / AppendLog() / CommitLog()
    // args: empty
    const ceLeaderAnnounced = "candidate.LeaderAnnounced"
    
    // trigger: RequestVote()
    // args: *rpc.RequestVoteCmd
    const ceVoteToCandidate = "candidate.VoteToCandidate"
    
    // trigger: whenLeaderHeartbeatThenSwitchToFollower()
    // args: empty
    const ceDisposing = "candidate.Disposing"
    
    func newCandidateState(ctx IRaftLSM, term int64) IRaftState {
        it := new(tCandidateState)
        it.init(ctx, term)
        return it
    }
    
    func (me *tCandidateState) init(ctx IRaftLSM, term int64) {
        me.mInitOnce.Do(func() {
            me.context = ctx
            me.mTerm = term
            me.initEventHandlers()
    
            me.raise(ceInit)
        })
    }
    
    
    func (me *tCandidateState) initEventHandlers() {
        // write only logic
        me.hookEventsForTerm()
        me.hookEventsForVotedTerm()
        me.hookEventsForVotedCandidateID()
        me.hookEventsForVotedTimestamp()
    
        // read only logic
        me.hook(ceLeaderAnnounced,
            me.whenLeaderAnnouncedThenSwitchToFollower)
        me.hook(ceElectionTimeout,
            me.whenElectionTimeoutThenRequestVoteAgain)
    }
    
    // hookEventsForTerm maintains field: mTerm
    // update: ceElectionTimeout
    func (me *tCandidateState) hookEventsForTerm() {
        me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
            // when election timeout, term++ and request vote again
            me.mTerm++
        })
    }
    
    // hookEventsForVotedTerm maintains field: mVotedTerm
    // update: ceInit / ceElectionTimeout / ceVoteToCandidate
    func (me *tCandidateState) hookEventsForVotedTerm() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            // initially, vote to itself
            me.mVotedTerm = me.mTerm
        })
    
        me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
            // when timeout, reset to itself
            me.mVotedTerm = me.mTerm
        })
    
        me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mVotedTerm = cmd.Term
        })
    }
    
    // hookEventsForVotedCandidateID maintains field: mVotedCandidateID
    // update: ceInit / ceElectionTimeout / ceVoteToCandidate
    func (me *tCandidateState) hookEventsForVotedCandidateID() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            // initially, vote to itself
            me.mVotedCandidateID = me.context.config().ID()
        })
    
        me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
            // when timeout, reset to itself
            me.mVotedCandidateID = me.context.config().ID()
        })
    
        me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mVotedCandidateID = cmd.CandidateID
        })
    }
    
    func (me *tCandidateState) hookEventsForVotedTimestamp() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            // initially, vote to itself
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    
        me.hook(ceElectionTimeout, func(e string, args ...interface{}) {
            // when timeout, reset to itself
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    
        me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    }
    
    func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
        // check term
        if cmd.Term <= me.mTerm {
            // bad leader
            ret.Code = rpc.HBTermMismatch
            return nil
        }
    
        // new leader
        me.raise(ceLeaderAnnounced)
    
        // return ok
        ret.Code = rpc.HBOk
        return nil
    }
    
    func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
        // check term
        if cmd.Term <= me.mTerm {
            // bad leader
            ret.Code = rpc.ALTermMismatch
            return nil
        }
    
        // new leader
        me.raise(ceLeaderAnnounced)
    
        // ignore and return
        ret.Code = rpc.ALInternalError
        return nil
    }
    
    func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
        // ignore and return
        ret.Code = rpc.CLInternalError
        return nil
    }
    
    func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        // todo: fixme
        panic("implements me")
    }
    
    func (me *tCandidateState) Role() roles.RaftRole {
        return roles.Candidate
    }
    
    func (me *tCandidateState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(feStart)
        })
    }
    
    
    func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
        me.raise(ceDisposing)
        me.context.handleStateChanged(newFollowerState(me.context))
    }
    
    func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ string, _ ...interface{}) {
        // todo: fixme
        panic("implements me")
    }
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang 学etcd 手写raft协议之6 事件驱动+读

        本文链接:https://www.haomeiwen.com/subject/hantkltx.html