美文网首页GO与微服务
手撸golang etcd raft协议之8

手撸golang etcd raft协议之8

作者: 老罗话编程 | 来源:发表于2021-04-03 23:00 被阅读0次

    手撸golang etcd raft协议之8

    缘起

    最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
    本系列笔记拟采用golang练习之
    gitee: https://gitee.com/ioly/learning.gooop

    raft分布式一致性算法

    分布式存储系统通常会通过维护多个副本来进行容错,
    以提高系统的可用性。
    这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
    
    Raft算法把问题分解成了四个子问题:
    1. 领袖选举(leader election)、
    2. 日志复制(log replication)、
    3. 安全性(safety)
    4. 成员关系变化(membership changes)
    这几个子问题。
    

    目标

    • 根据raft协议,实现高可用分布式强一致的kv存储

    子目标(Day 8)

    • 简化rpc连接管理器tRaftClientService的实现
    • 剥离IRaftLSM的内部支持接口到iRaftStateContext接口
    • 完成Candidate状态的处理逻辑

    设计

    • IRaftLSM:raft有限状态机接口
    • iRaftStateContext:提供状态模式下的上下文支持
    • tCandidateState:Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
    • tRaftClient:管理到指定raft节点的rpc连接
    • tRaftClientService:管理当前节点到其他raft节点的rpc连接

    IRaftLSM.go

    raft有限状态机接口

    package lsm
    
    import (
        "learning/gooop/etcd/raft/rpc"
    )
    
    // IRaftLSM raft有限状态自动机
    type IRaftLSM interface {
        rpc.IRaftRPC
        iRaftStateContext
    
        State() IRaftState
    }
    

    iRaftStateContext.go

    提供状态模式下的上下文支持

    package lsm
    
    import (
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/rpc/client"
        "learning/gooop/etcd/raft/store"
    )
    
    type iRaftStateContext interface {
        Config() config.IRaftConfig
        Store() store.ILogStore
        HandleStateChanged(state IRaftState)
        RaftClientService() client.IRaftClientService
    }
    
    

    tCandidateState.go

    Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。

    package lsm
    
    import (
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "time"
    )
    
    // tCandidateState presents a candidate node
    type tCandidateState struct {
        tEventDrivenModel
    
        context    iRaftStateContext
        mInitOnce  sync.Once
        mStartOnce sync.Once
    
        // update: init / ceAskingForVote
        mTerm int64
    
        // update: ceInit / ceAskingForVote / ceVoteToCandidate
        mVotedTerm int64
    
        // update: ceInit / ceAskingForVote / ceVoteToCandidate
        mVotedCandidateID string
    
        // update: ceInit / ceAskingForVote / ceVoteToCandidate
        mVotedTimestamp int64
    
        // update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposing
        mTicketCount map[string]bool
        mTicketMutex *sync.Mutex
    
        // update: ceInit / ceDisposing
        mDisposedFlag bool
    }
    
    // trigger: init()
    // args: empty
    const ceInit = "candidate.init"
    
    // trigger: Start()
    // args: empty
    const ceStart = "candidate.Start"
    
    // trigger: whenAskingForVoteThenWatchElectionTimeout()
    // args: empty
    const ceElectionTimeout = "candidate.ElectionTimeout"
    
    // trigger: Heartbeat() / AppendLog() / CommitLog()
    // args: empty
    const ceLeaderAnnounced = "candidate.LeaderAnnounced"
    
    // trigger: RequestVote()
    // args: *rpc.RequestVoteCmd
    const ceVoteToCandidate = "candidate.VoteToCandidate"
    
    // trigger: whenLeaderAnnouncedThenSwitchToFollower()
    // args: empty
    const ceDisposing = "candidate.Disposing"
    
    // trigger: beginAskForVote()
    // args: empty
    const ceAskingForVote = "candidate.AskingForVote"
    
    // trigger: handleRequestVoteOK()
    // args: empty
    const ceReceiveTicket = "candidate.ReceiveTicket"
    
    // trigger: whenReceiveTicketThenCheckTicketCount
    // args: empty
    const ceWinningTheVote = "candidate.ceWinningTheVote"
    
    func newCandidateState(ctx iRaftStateContext, term int64) IRaftState {
        it := new(tCandidateState)
        it.init(ctx, term)
        return it
    }
    
    func (me *tCandidateState) init(ctx iRaftStateContext, term int64) {
        me.mInitOnce.Do(func() {
            me.context = ctx
            me.mTerm = term
            me.initEventHandlers()
    
            me.raise(ceInit)
        })
    }
    
    func (me *tCandidateState) initEventHandlers() {
        // write only logic
        me.hookEventsForTerm()
        me.hookEventsForVotedTerm()
        me.hookEventsForVotedCandidateID()
        me.hookEventsForVotedTimestamp()
        me.hookEventsForTicketCount()
        me.hookEventsForDisposedFlag()
    
        // read only logic
        me.hook(ceStart,
            me.whenStartThenAskForVote)
        me.hook(ceAskingForVote,
            me.whenAskingForVoteThenWatchElectionTimeout)
        me.hook(ceReceiveTicket,
            me.whenReceiveTicketThenCheckTicketCount)
        me.hook(ceElectionTimeout,
            me.whenElectionTimeoutThenAskForVoteAgain)
        me.hook(ceWinningTheVote,
            me.whenWinningTheVoteThenSwitchToLeader)
        me.hook(ceLeaderAnnounced,
            me.whenLeaderAnnouncedThenSwitchToFollower)
    
    
    }
    
    // hookEventsForTerm maintains field: mTerm
    // update: ceElectionTimeout
    func (me *tCandidateState) hookEventsForTerm() {
        me.hook(ceAskingForVote, func(e string, args ...interface{}) {
            me.mTerm++
        })
    }
    
    // hookEventsForVotedTerm maintains field: mVotedTerm
    // update: ceInit / ceElectionTimeout / ceVoteToCandidate
    func (me *tCandidateState) hookEventsForVotedTerm() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            // initially, vote to itself
            me.mVotedTerm = me.mTerm
        })
    
        me.hook(ceAskingForVote, func(e string, args ...interface{}) {
            // when timeout, reset to itself
            me.mVotedTerm = me.mTerm
        })
    
        me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mVotedTerm = cmd.Term
        })
    }
    
    // hookEventsForVotedCandidateID maintains field: mVotedCandidateID
    // update: ceInit / ceElectionTimeout / ceVoteToCandidate
    func (me *tCandidateState) hookEventsForVotedCandidateID() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            // initially, vote to itself
            me.mVotedCandidateID = me.context.Config().ID()
        })
    
        me.hook(ceAskingForVote, func(e string, args ...interface{}) {
            me.mVotedCandidateID = me.context.Config().ID()
        })
    
        me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mVotedCandidateID = cmd.CandidateID
        })
    }
    
    func (me *tCandidateState) hookEventsForVotedTimestamp() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            // initially, vote to itself
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    
        me.hook(ceAskingForVote, func(e string, args ...interface{}) {
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    
        me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    }
    
    func (me *tCandidateState) hookEventsForTicketCount() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            me.mTicketMutex = new(sync.Mutex)
            me.mTicketCount = make(map[string]bool, 0)
            me.mTicketCount[me.context.Config().ID()] = true
        })
    
        me.hook(ceAskingForVote, func(e string, args ...interface{}) {
            me.mTicketMutex.Lock()
            defer me.mTicketMutex.Unlock()
    
            me.mTicketCount = make(map[string]bool, 0)
            me.mTicketCount[me.context.Config().ID()] = true
        })
    
        me.hook(ceReceiveTicket, func(e string, args ...interface{}) {
            peerID := args[0].(string)
    
            me.mTicketMutex.Lock()
            defer me.mTicketMutex.Unlock()
            me.mTicketCount[peerID] = true
        })
    
        me.hook(ceDisposing, func(e string, args ...interface{}) {
            me.mTicketMutex.Lock()
            defer me.mTicketMutex.Unlock()
            me.mTicketCount = make(map[string]bool, 0)
        })
    }
    
    
    func (me *tCandidateState) hookEventsForDisposedFlag() {
        me.hook(ceInit, func(e string, args ...interface{}) {
            me.mDisposedFlag = false
        })
    
        me.hook(ceDisposing, func(e string, args ...interface{}) {
            me.mDisposedFlag = true
        })
    }
    
    func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
        // check term
        if cmd.Term <= me.mTerm {
            // bad leader
            ret.Code = rpc.HBTermMismatch
            return nil
        }
    
        // new leader
        me.raise(ceLeaderAnnounced)
    
        // return ok
        ret.Code = rpc.HBOk
        return nil
    }
    
    func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
        // check term
        if cmd.Term <= me.mTerm {
            // bad leader
            ret.Code = rpc.ALTermMismatch
            return nil
        }
    
        // new leader
        me.raise(ceLeaderAnnounced)
    
        // ignore and return
        ret.Code = rpc.ALInternalError
        return nil
    }
    
    func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
        // ignore and return
        ret.Code = rpc.CLInternalError
        return nil
    }
    
    func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        // check voted term
        if cmd.Term < me.mVotedTerm {
            ret.Code = rpc.RVTermMismatch
            return nil
        }
    
        if cmd.Term == me.mVotedTerm {
            if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
                // already vote another
                ret.Code = rpc.RVVotedAnother
                return nil
            } else {
                // already voted
                ret.Code = rpc.RVOk
                return nil
            }
        }
    
        if cmd.Term > me.mVotedTerm {
            // new term, check log
            if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
                // good log
                me.raise(ceVoteToCandidate, cmd)
                ret.Code = rpc.RVOk
    
            } else {
                // bad log
                ret.Code = rpc.RVLogMismatch
            }
    
            return nil
        }
    
        // should not reaches here
        ret.Code = rpc.RVTermMismatch
        return nil
    }
    
    func (me *tCandidateState) Role() roles.RaftRole {
        return roles.Candidate
    }
    
    func (me *tCandidateState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(feStart)
        })
    }
    
    func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
        me.raise(ceDisposing)
        me.context.HandleStateChanged(newFollowerState(me.context))
    }
    
    func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) {
        me.beginAskForVote()
    }
    
    func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) {
        me.beginAskForVote()
    }
    
    func (me *tCandidateState) beginAskForVote() {
        // raise ceAskingForVote
        me.raise(ceAskingForVote)
    
        // for each node, call node.RequestVote
        cmd := new(rpc.RequestVoteCmd)
        cmd.CandidateID = me.context.Config().ID()
        cmd.Term = me.mTerm
    
        store := me.context.Store()
        cmd.LastLogIndex = store.LastCommittedIndex()
        cmd.LastLogTerm = store.LastCommittedTerm()
    
        term := me.mTerm
        for _,node := range me.context.Config().Nodes() {
            if node.ID() == me.context.Config().ID() {
                continue
            }
    
            peerID := node.ID()
            go func() {
                ret := new(rpc.RequestVoteRet)
                err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error {
                    return client.RequestVote(cmd, ret)
                })
    
                if err == nil && ret.Code == rpc.RVOk {
                    me.handleRequestVoteOK(peerID, term)
                }
            }()
        }
    }
    
    
    func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) {
        term := me.mTerm
        go func() {
            time.Sleep(timeout.RandElectionTimeout())
    
            if me.mDisposedFlag || me.mTerm != term {
                return
            }
    
            tc := me.getTicketCount()
            if tc < len(me.context.Config().Nodes())/2 + 1 {
                me.raise(ceElectionTimeout)
            }
        }()
    }
    
    
    func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) {
        if me.mDisposedFlag || me.mTerm != term {
            return
        }
    
        me.raise(ceReceiveTicket, peerID)
    }
    
    
    func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) {
        tc := me.getTicketCount()
        if tc >= len(me.context.Config().Nodes())/2 + 1 {
            // win the vote
            me.raise(ceWinningTheVote)
        }
    }
    
    func (me *tCandidateState) getTicketCount() int {
        me.mTicketMutex.Lock()
        defer me.mTicketMutex.Unlock()
        return len(me.mTicketCount)
    }
    
    func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) {
        me.raise(ceDisposing)
        me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm))
    }
    

    tRaftClient.go

    管理到指定raft节点的rpc连接

    package client
    
    import (
        "learning/gooop/etcd/raft/config"
        rrpc "learning/gooop/etcd/raft/rpc"
        "net/rpc"
    )
    
    type tRaftClient struct {
        cfg   config.IRaftNodeConfig
        conn  *rpc.Client
        state iClientState
    }
    
    func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
        it := new(tRaftClient)
        it.init(cfg, conn)
        return it
    }
    
    func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
        me.cfg = cfg
        me.conn = conn
    
        if conn == nil {
            me.state = newBrokenState(me)
        } else {
            me.state = newConnectedState(me)
        }
        me.state.Start()
    }
    
    func (me *tRaftClient) Config() config.IRaftNodeConfig {
        return me.cfg
    }
    
    func (me *tRaftClient) GetConn() *rpc.Client {
        return me.conn
    }
    
    func (me *tRaftClient) SetConn(conn *rpc.Client) {
        me.conn = conn
    }
    
    func (me *tRaftClient) HandleStateChanged(state iClientState) {
        me.state = state
        state.Start()
    }
    
    func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
        return me.state.Heartbeat(cmd, ret)
    }
    
    func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
        return me.state.AppendLog(cmd, ret)
    }
    
    func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
        return me.state.CommitLog(cmd, ret)
    }
    
    func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
        return me.state.RequestVote(cmd, ret)
    }
    
    func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
        return me.state.Ping(cmd, ret)
    }
    

    tRaftClientService.go

    管理当前节点到其他raft节点的rpc连接

    package client
    
    import (
        "errors"
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/rpc"
        netrpc "net/rpc"
    )
    
    type tRaftClientService struct {
        cfg     config.IRaftConfig
        clients map[string]IRaftClient
    }
    
    func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
        it := new(tRaftClientService)
        it.init(cfg)
        return it
    }
    
    func (me *tRaftClientService) init(cfg config.IRaftConfig) {
        me.cfg = cfg
        me.clients = make(map[string]IRaftClient)
    
        for _,nc := range me.cfg.Nodes() {
            me.clients[nc.ID()] = me.createRaftClient(nc)
        }
    }
    
    
    func (me  *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient {
        // dial to peer
        conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
        if err != nil {
            return newRaftClient(nodeCfg, nil)
        } else {
            return newRaftClient(nodeCfg, conn)
        }
    }
    
    func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
        it, ok := me.clients[peerID]
        if ok {
            return action(it)
        } else {
            return gErrorUnknownRaftPeer
        }
    }
    
    
    var gErrorUnknownRaftPeer = errors.New("unknown raft peer")
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang etcd raft协议之8

        本文链接:https://www.haomeiwen.com/subject/htrkkltx.html