美文网首页GO与微服务
手撸golang etcd raft协议之10

手撸golang etcd raft协议之10

作者: 老罗话编程 | 来源:发表于2021-04-05 20:45 被阅读0次

    手撸golang etcd raft协议之9,10

    缘起

    最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
    本系列笔记拟采用golang练习之

    raft分布式一致性算法

    分布式存储系统通常会通过维护多个副本来进行容错,
    以提高系统的可用性。
    这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
    
    Raft算法把问题分解成了四个子问题:
    1. 领袖选举(leader election)、
    2. 日志复制(log replication)、
    3. 安全性(safety)
    4. 成员关系变化(membership changes)
    这几个子问题。
    
    源码gitee地址:
    https://gitee.com/ioly/learning.gooop
    

    目标

    • 根据raft协议,实现高可用分布式强一致的kv存储

    子目标(Day 10)

    • 添加put/get/del kv键值对的rpc接口
    • 继续完善Leader状态的raft协议响应

    设计

    • rpc/IKVStoreRPC: kv操作的rpc接口
    • store/IKVStore: kv操作的持久化接口
    • stoer/ILogStore: 从IKVStore继承,以支持kv持久化
    • lsm/IRaftState: 继承rpc.IKVStoreRPC接口,以支持kv操作
    • lsm/tLeaderState: 初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。

    rpc/IKVStoreRPC.go

    kv操作的rpc接口

    package rpc
    
    type IKVStoreRPC interface {
        ExecuteKVCmd(cmd *KVCmd, ret *KVRet) error
    }
    
    type KVCmd struct {
        OPCode KVOPCode
        Key []byte
        Content []byte
    }
    
    type KVOPCode int
    const (
        KVGet KVOPCode = iota
        KVPut KVOPCode = iota
        KVDel KVOPCode = iota
    )
    
    type KVRet struct {
        Code KVRetCode
        Key []byte
        Content []byte
    }
    
    type KVRetCode int
    const (
        KVOk KVRetCode = iota
        KVKeyNotFound KVRetCode = iota
        KVInternalError KVRetCode = iota
    )
    

    store/IKVStore.go

    kv操作的持久化接口

    package store
    
    type IKVStore interface {
        Get(key []byte) (error, []byte)
        Put(key []byte, content []byte) error
        Del(key []byte) error
    }
    

    stoer/ILogStore.go

    从IKVStore继承,以支持kv持久化

    package store
    
    import (
        "learning/gooop/etcd/raft/model"
    )
    
    type ILogStore interface {
        IKVStore
    
        LastAppendedTerm() int64
        LastAppendedIndex() int64
        LastCommittedTerm() int64
        LastCommittedIndex() int64
    
        Append(entry *model.LogEntry) error
        Commit(index int64) error
        GetLog(index int64) (error, *model.LogEntry)
    }
    

    lsm/IRaftState.go

    继承rpc.IKVStoreRPC接口,以支持kv操作

    package lsm
    
    import (
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
    )
    
    type IRaftState interface {
        rpc.IRaftRPC
        rpc.IKVStoreRPC
    
        Role() roles.RaftRole
        Start()
    }
    

    lsm/tLeaderState.go

    初步实现Leader状态的raft协议处理,事件驱动的逻辑编排,读写分离的字段管理。

    package lsm
    
    import (
        "errors"
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/model"
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/store"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "time"
    )
    
    // tLeaderState presents a leader node
    type tLeaderState struct {
        tEventDrivenModel
    
        context    iRaftStateContext
        mInitOnce  sync.Once
        mStartOnce sync.Once
    
        // update: leInit / leLeaderHeartbeat
        mTerm int64
    
        // update: leInit / leDisposing
        mDisposedFlag bool
    
        // update: leVoteToCandidate
        mVotedTerm int64
        mVotedCandidateID string
        mVotedTimestamp int64
    }
    
    // trigger: init()
    // args: empty
    const leInit = "leader.init"
    
    // trigger: Start()
    // args: empty
    const leStart = "leader.Start"
    
    // trigger: whenNewLeaderAnnouncedThenSwitchToFollower
    // args: empty
    const leDiposing = "leader.Disposing"
    
    // trigger : Heartbeat() / AppendLog()
    // args: term int64
    const leNewLeaderAnnounced = "leader.NewLeaderAnnounced"
    
    
    // trigger: RequestVote()
    // args: *rpc.RequestVoteCmd
    const leBeforeRequestVote = "leader.BeforeRequestVote"
    
    // trigger:
    // args: *rpc.RequestVoteCmd
    const leVoteToCandidate = "leader.VoteToCandidate"
    
    // trigger: handleHeartbeat()
    // args: term int64
    const leHeartbeatRejected = "leader.HeartbeatRejected"
    
    
    func newLeaderState(ctx iRaftStateContext, term int64) IRaftState {
        it := new(tLeaderState)
        it.init(ctx, term)
        return it
    }
    
    func (me *tLeaderState) init(ctx iRaftStateContext, term int64) {
        me.mInitOnce.Do(func() {
            me.context = ctx
            me.mTerm = term
    
            me.initEventHandlers()
            me.raise(leInit)
        })
    }
    
    func (me *tLeaderState) initEventHandlers() {
        // write only logic
        me.hookEventsForDisposedFlag()
        me.hookEventsForVotedTerm()
    
        // read only logic
        me.hook(leStart,
            me.whenStartThenBeginHeartbeatToOthers)
        me.hook(leNewLeaderAnnounced,
            me.whenNewLeaderAnnouncedThenSwitchToFollower)
        me.hook(leHeartbeatRejected,
            me.whenHeartbeatRejectedThenSwitchToFollower)
    }
    
    
    func (me *tLeaderState) hookEventsForDisposedFlag() {
        me.hook(leInit, func(e string, args ...interface{}) {
            me.mDisposedFlag = false
        })
    
        me.hook(leDiposing, func(e string, args ...interface{}) {
            me.mDisposedFlag = true
        })
    }
    
    func (me *tLeaderState) hookEventsForVotedTerm() {
        me.hook(leBeforeRequestVote, func(e string, args ...interface{}) {
            // check last vote timeout
            if me.mVotedTerm == 0 {
                return
            }
    
            if time.Duration(time.Now().UnixNano() - me.mVotedTimestamp)*time.Nanosecond >= timeout.ElectionTimeout {
                me.mVotedTerm = 0
                me.mVotedTimestamp = 0
                me.mVotedCandidateID = ""
            }
        })
    
        me.hook(leVoteToCandidate, func(e string, args ...interface{}) {
            // after vote to candidate
            cmd := args[0].(*rpc.RequestVoteCmd)
            me.mVotedTerm = cmd.Term
            me.mVotedCandidateID = cmd.CandidateID
            me.mVotedTimestamp = time.Now().UnixNano()
        })
    }
    
    
    
    func (me *tLeaderState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
        // check term
        if cmd.Term <= me.mTerm {
            ret.Code = rpc.HBTermMismatch
            return nil
        }
    
        // new leader
        me.raise(leNewLeaderAnnounced, cmd.Term)
    
        // return ok
        ret.Code = rpc.HBOk
        return nil
    }
    
    func (me *tLeaderState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
        // check term
        if cmd.Term <= me.mTerm {
            ret.Code = rpc.ALTermMismatch
            return nil
        }
    
        // new leader
        me.raise(leNewLeaderAnnounced, cmd.Term)
    
        // return ok
        ret.Code = rpc.ALInternalError
        return nil
    }
    
    func (me *tLeaderState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
        // just ignore
        ret.Code = rpc.CLInternalError
        return nil
    }
    
    func (me *tLeaderState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        me.raise(leBeforeRequestVote, cmd)
    
        // check voted term
        if cmd.Term < me.mVotedTerm {
            ret.Code = rpc.RVTermMismatch
            return nil
        }
    
        if cmd.Term == me.mVotedTerm {
            if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
                // already vote another
                ret.Code = rpc.RVVotedAnother
                return nil
            } else {
                // already voted
                ret.Code = rpc.RVOk
                return nil
            }
        }
    
        if cmd.Term > me.mVotedTerm {
            // new term, check log
            if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
                // good log
                me.raise(leVoteToCandidate, cmd)
                ret.Code = rpc.RVOk
    
            } else {
                // bad log
                ret.Code = rpc.RVLogMismatch
            }
    
            return nil
        }
    
        // should not reach here
        ret.Code = rpc.RVTermMismatch
        return nil
    }
    
    func (me *tLeaderState) Role() roles.RaftRole {
        return roles.Leader
    }
    
    func (me *tLeaderState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(leStart)
        })
    }
    
    func (me *tLeaderState) whenStartThenBeginHeartbeatToOthers(_ string, _ ...interface{}) {
        go func() {
            for !me.mDisposedFlag {
                _ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {
                    return me.handleHeartbeat(client)
                })
                time.Sleep(timeout.HeartbeatInterval)
            }
        }()
    }
    
    
    func (me *tLeaderState) boardcast(action func(config.IRaftNodeConfig, rpc.IRaftRPC) error) error {
        for _,it := range me.context.Config().Nodes() {
            if it.ID() == me.context.Config().ID() {
                continue
            }
    
            e := me.context.RaftClientService().Using(it.ID(), func(client rpc.IRaftRPC) error {
                return action(it, client)
            })
            if e != nil {
                return e
            }
        }
    
        return nil
    }
    
    
    func (me *tLeaderState) handleHeartbeat(client rpc.IRaftRPC) error {
        cmd := new(rpc.HeartbeatCmd)
        cmd.Term = me.mTerm
        cmd.LeaderID = me.context.Config().ID()
    
        ret := new(rpc.HeartbeatRet)
        e := client.Heartbeat(cmd, ret)
        if e != nil {
            return e
        }
    
        switch ret.Code {
        case rpc.HBTermMismatch:
            me.raise(leHeartbeatRejected, ret.Term)
            break
        }
    
        return nil
    }
    
    func (me *tLeaderState) whenNewLeaderAnnouncedThenSwitchToFollower(_ string, args ...interface{}) {
        me.raise(leDiposing)
    
        term := args[0].(int64)
        me.context.HandleStateChanged(newFollowerState(me.context, term))
    }
    
    func (me *tLeaderState) whenHeartbeatRejectedThenSwitchToFollower(_ string, args ...interface{}) {
        me.raise(leDiposing)
    
        term := args[0].(int64)
        me.context.HandleStateChanged(newFollowerState(me.context, term))
    }
    
    
    func (me *tLeaderState) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
        switch cmd.OPCode {
        case rpc.KVGet:
            return me.handleKVGet(cmd, ret)
    
        case rpc.KVPut:
            return me.handleKVPut(cmd, ret)
    
        case rpc.KVDel:
            return me.handleKVDel(cmd, ret)
        }
    
        return nil
    }
    
    func (me *tLeaderState) handleKVGet(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
        e, v := me.context.Store().Get(cmd.Key)
        if e != nil {
            ret.Code = rpc.KVInternalError
            return e
        }
    
        ret.Code = rpc.KVOk
        ret.Content = v
        return nil
    }
    
    func (me *tLeaderState) handleKVPut(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
        kvcmd := new(store.PutCmd)
        kvcmd.Key = cmd.Key
        kvcmd.Value = cmd.Content
    
        // create/append/commit log
        e := me.broadcastKVCmd(kvcmd, ret)
        if e != nil {
            return e
        }
    
        // apply cmd
        return me.context.Store().Put(cmd.Key, cmd.Content)
    }
    
    func (me *tLeaderState) handleKVDel(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
        kvcmd := new(store.DelCmd)
        kvcmd.Key = cmd.Key
    
        // create/append/commit log
        e := me.broadcastKVCmd(kvcmd, ret)
        if e != nil {
            return e
        }
    
        // apply cmd
        return me.context.Store().Put(cmd.Key, cmd.Content)
    }
    
    func (me *tLeaderState) broadcastKVCmd(cmd store.IKVCmd, ret *rpc.KVRet) error {
        // create log
        st := me.context.Store()
        log := new(model.LogEntry)
        log.Term = me.mTerm
        log.Index = st.LastCommittedIndex() + 1
        log.PrevTerm = st.LastCommittedTerm()
        log.PrevIndex = st.LastCommittedIndex()
        log.Command = cmd.Marshal()
    
        // append log
        e := st.Append(log)
        if e != nil {
            ret.Code = rpc.KVInternalError
            return e
        }
    
        // ask other nodes to append log
        alcmd := new(rpc.AppendLogCmd)
        alcmd.Term = me.mTerm
        alcmd.LeaderID = me.context.Config().ID()
        alcmd.Entry = log
    
        sumOk := []int{ 0 }
        _ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {
            alret := new(rpc.AppendLogRet)
            e := client.AppendLog(alcmd, alret)
            if e != nil {
                return e
            }
    
            switch alret.Code {
            case rpc.ALOk:
                sumOk[0]++
                break
    
            case rpc.ALTermMismatch:
                // todo: fixme
                break
    
            case rpc.ALIndexMismatch:
                // todo: fixme
                break
            }
    
            return nil
        })
    
        // wait for most nodes
        if sumOk[0] >= len(me.context.Config().Nodes()) / 2 {
            // commit log
            clcmd := new(rpc.CommitLogCmd)
            clcmd.LeaderID = me.context.Config().ID()
            clcmd.Term = me.mTerm
            clcmd.Index = log.Index
    
            _ = me.boardcast(func(_ config.IRaftNodeConfig, client rpc.IRaftRPC) error {
                ret := new(rpc.CommitLogRet)
                e := client.CommitLog(clcmd, ret)
                if e != nil {
                    return e
                }
    
                switch ret.Code {
                case rpc.CLInternalError:
                    // todo: fixme
                    break
    
                case rpc.CLLogNotFound:
                    // todo: fixme
                    break
    
                case rpc.CLOk:
                    return nil
                }
    
                return nil
            })
    
            // ok
            return nil
    
        } else {
            return gErrorCannotReachAgreement
        }
    }
    
    var gErrorCannotReachAgreement = errors.New("cannot reach agreement")
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang etcd raft协议之10

        本文链接:https://www.haomeiwen.com/subject/dmfjkltx.html