美文网首页GO与微服务
手撸golang etcd raft协议之3

手撸golang etcd raft协议之3

作者: 老罗话编程 | 来源:发表于2021-03-29 23:55 被阅读0次

    手撸golang etcd raft协议之3

    缘起

    最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
    本系列笔记拟采用golang练习之
    gitee: https://gitee.com/ioly/learning.gooop

    raft分布式一致性算法

    分布式存储系统通常会通过维护多个副本来进行容错,
    以提高系统的可用性。
    这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
    
    Raft算法把问题分解成了领袖选举(leader election)、
    日志复制(log replication)、安全性(safety)
    和成员关系变化(membership changes)这几个子问题。
    
    Raft算法的基本操作只需2种RPC即可完成。
    RequestVote RPC是在选举过程中通过旧的Leader触发的,
    AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
    

    目标

    • 根据raft协议,实现高可用分布式强一致的kv存储

    子目标(Day 3)

    • 继续完善raft状态机之Follower状态的处理逻辑
    • 继续完善raft状态机之Candidate状态的处理逻辑

    设计

    • tFollowerState:
      • 监视Leader心跳是否超时
      • 如果Leader心跳超时,则切换到Candidate状态,竞选新leader
      • 添加RequestVote和AppendEntries两个RPC接口的响应
    • tCandidateState:
      • 进入此状态,立即向其他节点发起竞选请求
      • 如竞选超时,则重新发起竞选
      • 如收到新Leader心跳,则切换回Follower
      • 如收到N/2+1张票,则切换到Leader,并广播之

    tFollowerState.go

    继续完善raft状态机之Follower状态的处理逻辑

    package lsm
    
    import (
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "time"
    )
    
    type tFollowerState struct {
        tRaftStateBase
    
        mInitOnce  sync.Once
        mStartOnce sync.Once
    
        mVotedLeaderID string
        mLeaderHeartbeatClock int64
        mStateChangedHandler StateChangedHandleFunc
        mEventMap  map[tFollowerEvent][]tFollowerEventHandler
    }
    
    
    type JobFunc func()
    
    type tFollowerEvent int
    const (
        evFollowerStart tFollowerEvent = iota
        evFollowerLeaderHeartbeatTimeout tFollowerEvent = iota
    )
    
    type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})
    
    func newFollowerState(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) IRaftState {
        it := new(tFollowerState)
        it.init(term, cfg, handler)
    
        return it
    }
    
    func (me *tFollowerState) init(term int, cfg config.IRaftConfig, handler StateChangedHandleFunc) {
        me.mInitOnce.Do(func() {
            me.tRaftStateBase = *newRaftStateBase(term, cfg)
            me.role = roles.Follower
            me.mStateChangedHandler = handler
    
            // init event map
            me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
            me.registerEventHandlers()
        })
    }
    
    func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {
        if handlers, ok := me.mEventMap[e]; ok {
            for _, it := range handlers {
                it(e, args...)
            }
        }
    }
    
    func (me *tFollowerState) registerEventHandlers() {
        me.mEventMap[evFollowerStart] = []tFollowerEventHandler{
            me.whenStartThenBeginWatchLeaderTimeout,
        }
        me.mEventMap[evFollowerLeaderHeartbeatTimeout] = []tFollowerEventHandler {
            me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState,
        }
    }
    
    
    func (me *tFollowerState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(evFollowerStart)
        })
    }
    
    func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args... interface{}) {
        go func() {
            iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
            iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
            for range time.Tick(iCheckingTimeoutInterval) {
                now := time.Now().UnixNano()
                if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {
                    me.raise(evFollowerLeaderHeartbeatTimeout)
                    return
                }
            }
        }()
    }
    
    
    func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ tFollowerEvent, args... interface{}) {
        fn := me.mStateChangedHandler
        if fn == nil {
            return
        }
    
        state := newCandidateState(me.cfg, me.term, me.mStateChangedHandler)
        fn(state)
    }
    
    func (me *tFollowerState) Role() roles.RaftRole {
        return roles.Follower
    }
    
    func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        if cmd.Term <= me.term {
            ret.Term = me.term
            ret.VoteGranted = false
            return nil
        }
    
        if me.mVotedLeaderID != "" && me.mVotedLeaderID != cmd.CandidateID {
            ret.Term = me.term
            ret.VoteGranted = false
            return nil
        }
    
        me.mVotedLeaderID = cmd.CandidateID
        ret.Term = cmd.Term
        ret.VoteGranted = true
        return nil
    }
    
    func (me *tFollowerState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {
        if cmd.Term < me.term {
            ret.Term = me.term
            ret.Success = false
            return nil
        }
    
        me.term = cmd.Term
        me.leaderID = cmd.LeaderID
        me.mLeaderHeartbeatClock = time.Now().UnixNano()
    
        if len(cmd.Entries) <= 0 {
            // just heartbeat package
            ret.Term = cmd.Term
            ret.Success = true
            return nil
        }
    
        // todo: append logs
        return nil
    }
    
    
    func (me *tFollowerState) StateChangedHandler(handler StateChangedHandleFunc) {
        me.mStateChangedHandler = handler
    }
    
    

    tCandidateState.go

    继续完善raft状态机之Candidate状态的处理逻辑

    package lsm
    
    import (
        "errors"
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/rpc"
        "sync"
    )
    
    type tCandidateState struct {
        tRaftStateBase
    
        mInitOnce  sync.Once
        mStartOnce sync.Once
    
        mStateChangedHandler StateChangedHandleFunc
        mEventMap  map[tCandidateEvent][]tCandidateEventHandler
    }
    
    
    func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        return gErrorCandidateWontReplyRequestVote
    }
    
    func (me *tCandidateState) AppendEntries(cmd *rpc.AppendEntriesCmd, ret *rpc.AppendEntriesRet) error {
        return gErrorCandidateWontReplyAppendEntries
    }
    
    func (me *tCandidateState) StateChangedHandler(handler StateChangedHandleFunc) {
        me.mStateChangedHandler = handler
    }
    
    type tCandidateEvent int
    const (
        evCandidateStart tCandidateEvent = iota
        evCandidateElectionTimeout tCandidateEvent = iota
        evCandidateGotEnoughVotes tCandidateEvent = iota
    )
    
    type tCandidateEventHandler func(e tCandidateEvent, args ...interface{})
    
    func newCandidateState(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) IRaftState {
        it := new(tCandidateState)
        it.init(cfg, term, handler)
        return it
    }
    
    
    func (me *tCandidateState) init(cfg config.IRaftConfig, term int, handler StateChangedHandleFunc) {
        me.mInitOnce.Do(func() {
            me.cfg = cfg
            me.term = term
            me.mStateChangedHandler = handler
    
            // init event map
            me.mEventMap = make(map[tCandidateEvent][]tCandidateEventHandler)
            me.registerEventHandlers()
        })
    }
    
    
    func (me *tCandidateState) registerEventHandlers() {
        me.mEventMap[evCandidateStart] = []tCandidateEventHandler{
            me.whenStartThenRequestVote,
            me.whenStartThenWatchElectionTimeout,
        }
    
        me.mEventMap[evCandidateElectionTimeout] = []tCandidateEventHandler{
            me.whenElectionTimeoutThenRequestVoteAgain,
        }
    
        me.mEventMap[evCandidateGotEnoughVotes] = []tCandidateEventHandler{
            me.whenGotEnoughVotesThenSwitchToLeader,
        }
    }
    
    func (me *tCandidateState) raise(e tCandidateEvent, args ...interface{}) {
        if handlers, ok := me.mEventMap[e]; ok {
            for _, it := range handlers {
                it(e, args...)
            }
        }
    }
    
    func (me *tCandidateState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(evCandidateStart)
        })
    }
    
    func (me *tCandidateState) whenStartThenRequestVote(_ tCandidateEvent, _... interface{}) {
        // todo: fixme
        panic("implements me")
    }
    
    func (me *tCandidateState) whenStartThenWatchElectionTimeout(_ tCandidateEvent, _... interface{}) {
        // todo: fixme
        panic("implements me")
    }
    
    func (me *tCandidateState) whenElectionTimeoutThenRequestVoteAgain(_ tCandidateEvent, _... interface{}) {
        // todo: fixme
        panic("implements me")
    }
    
    func (me *tCandidateState) whenGotEnoughVotesThenSwitchToLeader(_ tCandidateEvent, _... interface{}) {
        // todo: fixme
        panic("implements me")
    }
    
    var gErrorCandidateWontReplyRequestVote = errors.New("candidate won't reply RequestVote RPC")
    var gErrorCandidateWontReplyAppendEntries = errors.New("candidate won't reply AppendEntries RPC")
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang etcd raft协议之3

        本文链接:https://www.haomeiwen.com/subject/rotwhltx.html