美文网首页GO与微服务
手撸golang etcd raft协议之7

手撸golang etcd raft协议之7

作者: 老罗话编程 | 来源:发表于2021-04-02 22:54 被阅读0次

    手撸golang etcd raft协议之7

    缘起

    最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
    本系列笔记拟采用golang练习之
    gitee: https://gitee.com/ioly/learning.gooop

    raft分布式一致性算法

    分布式存储系统通常会通过维护多个副本来进行容错,
    以提高系统的可用性。
    这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
    
    Raft算法把问题分解成了四个子问题:
    1. 领袖选举(leader election)、
    2. 日志复制(log replication)、
    3. 安全性(safety)
    4. 成员关系变化(membership changes)
    这几个子问题。
    

    目标

    • 根据raft协议,实现高可用分布式强一致的kv存储

    子目标(Day 7)

    • 实现各raft节点之间的rpc通讯
      • 定义IRaftClientService服务,管理所有节点的tcp长连接
      • 定义IRaftClient接口,封装节点间的rpc调用
        • 基于状态模式,区分已连接状态和已断开状态
        • 基于事件驱动的逻辑编排
        • 基于读写分离的字段管理

    设计

    • model/IEventDirvenModel: 事件驱动的逻辑编排基类
    • IRaftClientService:管理所有的节点间rpc连接
    • IRaftClient:管理当前节点与某个节点间的rpc连接
    • iClientState:基于状态模式的rpc连接状态接口
    • iStateContext:状态模式下的连接状态上下文接口
    • tRaftClient:IRaftClient接口的具体实现,并实现iStateContext接口。
    • tConnectedState: 管理已连接状态的rpc连接
      • 定时Ping以检测连接状态
      • 基于事件驱动的逻辑编排
      • 基于读写分离的字段管理
    • tBrokenState:管理已断开状态的rpc连接
      • 定时Dial以尝试重连接
      • 基于事件驱动的逻辑编排
      • 基于读写分离的字段管理

    model/IEventDirvenModel.go

    事件驱动的逻辑编排基类

    package model
    
    type TEventHandleFunc func(e string, args ...interface{})
    
    type IEventDrivenModel interface {
        hook(e string, handleFunc TEventHandleFunc)
        raise(e string, args ...interface{})
    }
    
    type TEventDrivenModel struct {
        items map[string][]TEventHandleFunc
    }
    
    func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) {
        arr, ok := me.items[e]
        if ok {
            me.items[e] = append(arr, handler)
        } else {
            me.items[e] = []TEventHandleFunc{handler}
        }
    }
    
    func (me *TEventDrivenModel) Raise(e string, args ...interface{}) {
        if handlers, ok := me.items[e]; ok {
            for _, it := range handlers {
                it(e, args...)
            }
        }
    }
    

    IRaftClientService.go

    管理所有的节点间rpc连接

    package client
    
    import (
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/rpc"
        netrpc "net/rpc"
        "sync"
    )
    
    type tRaftClientService struct {
        cfg config.IRaftConfig
        rwmutex *sync.RWMutex
        clients map[string]IRaftClient
    }
    
    func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
        it := new(tRaftClientService)
        it.init(cfg)
        return it
    }
    
    
    func (me *tRaftClientService) init(cfg config.IRaftConfig) {
        me.cfg = cfg
        me.rwmutex = new(sync.RWMutex)
        me.clients = make(map[string]IRaftClient)
    }
    
    
    func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
        // check client exists?
        me.rwmutex.RLock()
        it,ok := me.clients[peerID]
        if ok {
            return action(it)
        }
    
        var nodeCfg config.IRaftNodeConfig
        for _,it := range me.cfg.Nodes() {
            if it.ID() == peerID {
                nodeCfg = it
                break
            }
        }
        me.rwmutex.RUnlock()
    
        // dial to peer
        conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
        if err != nil {
            return err
        }
    
        // to create new client
        me.rwmutex.Lock()
        defer me.rwmutex.Unlock()
    
        // recheck client
        _,ok = me.clients[peerID]
        if ok {
            defer conn.Close()
            return action(it)
        }
    
        // create new client
        return action(newRaftClient(nodeCfg, conn))
    }
    

    IRaftClient.go

    管理当前节点与某个节点间的rpc连接

    package client
    
    import "learning/gooop/etcd/raft/rpc"
    
    type IRaftClient interface {
        rpc.IRaftRPC
        iStateContext
    
        Ping(cmd *PingCmd, ret *PingRet) error
    }
    
    
    type PingCmd struct {
        SenderID string
        Timestamp int64
    }
    
    type PingRet struct {
        SenderID string
        Timestamp int64
    }
    

    iClientState.go

    基于状态模式的rpc连接状态接口

    package client
    
    import "learning/gooop/etcd/raft/rpc"
    
    type iClientState interface {
        rpc.IRaftRPC
    
        Start()
        Ping(cmd *PingCmd, ret *PingRet) error
    }
    
    

    iStateContext.go

    状态模式下的连接状态上下文接口

    package client
    
    import (
        "learning/gooop/etcd/raft/config"
        "net/rpc"
    )
    
    type iStateContext interface {
        Config() config.IRaftNodeConfig
        GetConn() *rpc.Client
        SetConn(client *rpc.Client)
        HandleStateChanged(state iClientState)
    }
    

    tRaftClient.go

    IRaftClient接口的具体实现,并实现iStateContext接口。

    package client
    
    import (
        "learning/gooop/etcd/raft/config"
        "net/rpc"
        rrpc "learning/gooop/etcd/raft/rpc"
    )
    
    type tRaftClient struct {
        cfg config.IRaftNodeConfig
        conn *rpc.Client
        state iClientState
    }
    
    func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
        it := new(tRaftClient)
        it.init(cfg, conn)
        return it
    }
    
    func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
        me.cfg = cfg
        me.conn = conn
    }
    
    func (me *tRaftClient) Config() config.IRaftNodeConfig {
        return me.cfg
    }
    
    func (me *tRaftClient) GetConn() *rpc.Client {
        return me.conn
    }
    
    func (me *tRaftClient) SetConn(conn *rpc.Client) {
        me.conn = conn
    }
    
    func (me *tRaftClient) HandleStateChanged(state iClientState) {
        me.state = state
        state.Start()
    }
    
    func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
        return me.state.Heartbeat(cmd, ret)
    }
    
    func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
        return me.state.AppendLog(cmd, ret)
    }
    
    func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
        return me.state.CommitLog(cmd, ret)
    }
    
    func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
        return me.state.RequestVote(cmd, ret)
    }
    
    func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
        return me.state.Ping(cmd, ret)
    }
    

    tConnectedState.go

    管理已连接状态的rpc连接

    • 定时Ping以检测连接状态
    • 基于事件驱动的逻辑编排
    • 基于读写分离的字段管理
    package client
    
    import (
        "learning/gooop/etcd/raft/model"
        "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "time"
    )
    
    type tConnectedState struct {
        model.TEventDrivenModel
        context iStateContext
    
        mInitOnce sync.Once
        mStartOnce sync.Once
    
        // update: ceInit, ceDisposing
        mDisposedFlag bool
    }
    
    // trigger: init()
    // args: empty
    const ceInit = "connected.init"
    
    // trigger: Start()
    // args: empty
    const ceStart = "connected.Start"
    
    // trigger:
    // args: empty
    const ceDisposing = "connected.Disposing"
    
    // trigger: whenStartThenBeginPing()
    // args: empty
    const cePingFailed = "connected.PingFailed"
    
    
    func newConnectedState(ctx iStateContext) iClientState {
        it := new(tConnectedState)
        it.init(ctx)
        return it
    }
    
    func (me *tConnectedState) init(ctx iStateContext) {
        me.mInitOnce.Do(func() {
            me.context = ctx
            me.initEventHandlers()
            me.Raise(ceInit)
        })
    }
    
    func (me *tConnectedState) initEventHandlers() {
        // write only logic
        me.hookEventsForDisposedFlag()
    
        // read only logic
        me.Hook(ceStart,
            me.whenStartThenBeginPing)
    
        me.Hook(cePingFailed,
            me.whenPingFailedThenSwitchToBrokenState)
    
        me.Hook(ceDisposing,
            me.whenDisposingThenCloseConn)
    }
    
    func (me *tConnectedState) Start() {
        me.mStartOnce.Do(func() {
            me.Raise(ceStart)
        })
    }
    
    func (me *tConnectedState) hookEventsForDisposedFlag() {
        me.Hook(ceInit, func(e string, args ...interface{}) {
            me.mDisposedFlag = false
        })
    
        me.Hook(ceDisposing, func(e string, args ...interface{}) {
            me.mDisposedFlag = true
        })
    }
    
    func (me *tConnectedState) whenStartThenBeginPing(_ string, _ ...interface{}) {
        go func() {
            cmd := &PingCmd{
                SenderID: me.context.Config().ID(),
                Timestamp: time.Now().UnixNano(),
            }
            ret := &PingRet{}
            for range time.Tick(timeout.ClientPingInterval) {
                if me.mDisposedFlag {
                    return
                }
    
                cmd.Timestamp = time.Now().UnixNano()
                err := me.Ping(cmd, ret)
                if err != nil {
                    me.Raise(cePingFailed)
                }
            }
        }()
    }
    
    func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState(_ string, _ ...interface{}) {
        me.Raise(ceDisposing)
        me.context.HandleStateChanged(newBrokenState(me.context))
    }
    
    func (me *tConnectedState) whenDisposingThenCloseConn(_ string, _ ...interface{}) {
        it := me.context.GetConn()
        if it != nil {
            it.Close()
        }
    
        me.context.SetConn(nil)
    }
    
    
    func (me *tConnectedState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
        return me.context.GetConn().Call("TRaftRPCServer.Heartbeat", cmd, ret)
    }
    
    func (me *tConnectedState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
        return me.context.GetConn().Call("TRaftRPCServer.AppendLog", cmd, ret)
    }
    
    func (me *tConnectedState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
        return me.context.GetConn().Call("TRaftRPCServer.CommitLog", cmd, ret)
    }
    
    func (me *tConnectedState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        return me.context.GetConn().Call("TRaftRPCServer.RequestVote", cmd, ret)
    }
    
    func (me *tConnectedState) Ping(cmd *PingCmd, ret *PingRet) error {
        return me.context.GetConn().Call("TRaftRPCServer.Ping", cmd, ret)
    }
    

    tBrokenState.go

    管理已断开状态的rpc连接

    • 定时Dial以尝试重连接
    • 基于事件驱动的逻辑编排
    • 基于读写分离的字段管理
    package client
    
    import (
        "errors"
        "learning/gooop/etcd/raft/model"
        rrpc "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "net/rpc"
        "time"
    )
    
    type tBrokenState struct {
        model.TEventDrivenModel
        context iStateContext
    
        mInitOnce sync.Once
        mStartOnce sync.Once
    
        mDisposedFlag bool
    }
    
    // trigger : init()
    // args: empty
    const beInit = "broken.init"
    
    // trigger: Start()
    // args: empty
    const beStart = "broken.Start"
    
    // trigger: whenStartThenBeginDial
    // args: *rpc.Client
    const beDialOK  = "broken.DialOK"
    
    // trigger: whenDialOKThenSwitchToConnectedState
    // args: empty
    const beDisposing = "broken.Disposing"
    
    func newBrokenState(ctx iStateContext) iClientState {
        it := new(tBrokenState)
        it.init(ctx)
        return it
    }
    
    func (me *tBrokenState) init(ctx iStateContext) {
        me.mInitOnce.Do(func() {
            me.context = ctx
            me.initEventHandlers()
            me.Raise(beInit)
        })
    }
    
    func (me *tBrokenState) initEventHandlers() {
        // write only logic
        me.hookEventsForDisposedFlag()
    
        // read only logic
        me.Hook(beStart,
            me.whenStartThenBeginDial)
    
        me.Hook(beDialOK,
            me.whenDialOKThenSetConn)
    
        me.Hook(beDialOK,
            me.whenDialOKThenSwitchToConnectedState)
    }
    
    
    func (me *tBrokenState) hookEventsForDisposedFlag() {
        me.Hook(beInit, func(e string, args ...interface{}) {
            me.mDisposedFlag = false
        })
    
        me.Hook(beDisposing, func(e string, args ...interface{}) {
            me.mDisposedFlag = true
        })
    }
    
    func (me *tBrokenState) Start() {
        me.mStartOnce.Do(func() {
            me.Raise(beStart)
        })
    }
    
    
    func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {
        go func() {
            for !me.mDisposedFlag {
                conn, err := rpc.Dial("tcp", me.context.Config().Endpoint())
                if err == nil {
                    me.Raise(beDialOK, conn)
                    break
    
                } else {
                    time.Sleep(timeout.ClientRedialInterval)
                }
            }
        }()
    }
    
    
    func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {
        conn := args[0].(*rpc.Client)
        me.context.SetConn(conn)
    }
    
    
    func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {
        me.Raise(beDisposing)
        me.context.HandleStateChanged(newConnectedState(me.context))
    }
    
    func (me *tBrokenState) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
        return gErrorConnectionBroken
    }
    
    func (me *tBrokenState) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
        return gErrorConnectionBroken
    }
    
    func (me *tBrokenState) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
        return gErrorConnectionBroken
    }
    
    func (me *tBrokenState) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
        return gErrorConnectionBroken
    }
    
    func (me *tBrokenState) Ping(cmd *PingCmd, ret *PingRet) error {
        return gErrorConnectionBroken
    }
    
    var gErrorConnectionBroken = errors.New("peer connection broken")
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang etcd raft协议之7

        本文链接:https://www.haomeiwen.com/subject/yzhckltx.html