美文网首页GO与微服务
手撸golang etcd raft协议之2

手撸golang etcd raft协议之2

作者: 老罗话编程 | 来源:发表于2021-03-28 22:28 被阅读0次

    手撸golang etcd raft协议之2

    缘起

    最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
    本系列笔记拟采用golang练习之
    gitee: https://gitee.com/ioly/learning.gooop

    raft分布式一致性算法

    分布式存储系统通常会通过维护多个副本来进行容错,
    以提高系统的可用性。
    这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
    
    Raft算法把问题分解成了领袖选举(leader election)、
    日志复制(log replication)、安全性(safety)
    和成员关系变化(membership changes)这几个子问题。
    
    Raft算法的基本操作只需2种RPC即可完成。
    RequestVote RPC是在选举过程中通过旧的Leader触发的,
    AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
    

    目标

    • 根据raft协议,实现高可用分布式强一致的kv存储

    子目标(Day 2)

    • 定义raft rpc接口
    • 定义raft lsm有限状态自动机接口(状态模式)

    设计

    • config/IRaftConfig.go: 集群配置接口。简单起见, 使用静态配置模式定义节点数量和地址。
    • config/IRaftNodeConfig.go: 节点配置接口
    • roles/roles.go:raft三种角色常量
    • timeout/timeout.go:超时时间常量
    • rpc/IRaftRPC.go: raft协议的基本RPC接口及参数定义。简单起见,拟采用net/rpc实现之。
    • rpc/IRaftRPCServer.go: 支持raft协议的服务器接口。简单起见,拟采用net/rpc实现之。
    • lsm/IRaftLSM.go: raft有限状态机接口
    • lsm/IRaftState.go: 状态接口
    • lsm/tRaftStateBase.go: 基本状态数据
    • lsm/tFollowerState: follower状态的实现,未完成

    config/IRaftConfig.go

    集群配置接口。简单起见, 使用静态配置模式定义节点数量和地址。

    package config
    
    type IRaftConfig interface {
        ID() string
        Nodes() []IRaftNodeConfig
    }
    

    config/IRaftNodeConfig.go

    节点配置接口

    package config
    
    type IRaftNodeConfig interface {
        ID() string
        Endpoint() string
    }
    

    roles/roles.go

    raft三种角色常量

    package roles
    
    type RaftRole int
    
    const Follower RaftRole = 1
    const Candidate RaftRole = 2
    const Leader RaftRole = 3
    

    timeout/timeout.go

    超时时间常量

    package timeout
    
    import "time"
    
    const HeartbeatInterval = 150 * time.Millisecond
    const HeartbeatTimeout = 5 * HeartbeatInterval
    const ElectionTimeout = HeartbeatTimeout
    
    

    rpc/IRaftRPC.go

    raft协议的基本RPC接口及参数定义。简单起见,拟采用net/rpc实现之。

    package rpc
    
    type IRaftRPC interface {
        RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
        AppendEntries(cmd *AppendEntriesCmd, ret *AppendEntriesRet) error
    }
    
    type RequestVoteCmd struct {
        CandidateID  string
        Term         int
        LastLogIndex int
        LastLogTerm  int
    }
    
    type RequestVoteRet struct {
        Term        int
        VoteGranted bool
    }
    
    type AppendEntriesCmd struct {
        Term         int
        LeaderID     string
        PrevLogTerm  int
        PrevLogIndex int
        LeaderCommit int
        Entries      []*LogEntry
    }
    
    type LogEntry struct {
        Tag     int
        Content []byte
    }
    
    type AppendEntriesRet struct {
        Term    int
        Success bool
    }
    

    rpc/IRaftRPCServer.go

    支持raft协议的服务器接口。简单起见,拟采用net/rpc实现之。

    package rpc
    
    type IRaftRPCServer interface {
        BeginServeTCP(port int, r IRaftRPC)
    }
    
    

    lsm/IRaftLSM.go

    raft有限状态机接口

    package lsm
    
    import "learning/gooop/etcd/raft/rpc"
    
    // IRaftLSM raft有限状态自动机
    type IRaftLSM interface {
        rpc.IRaftRPC
    
        State() IRaftState
    }
    

    lsm/IRaftState.go

    状态接口

    package lsm
    
    import (
        "learning/gooop/etcd/raft/roles"
        "learning/gooop/etcd/raft/rpc"
    )
    
    type IRaftState interface {
        rpc.IRaftRPC
    
        Role() roles.RaftRole
        Start()
    }
    

    lsm/tRaftStateBase.go

    基本状态数据

    package lsm
    
    import (
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/roles"
    )
    
    //
    type tRaftStateBase struct {
        // 当前角色
        role roles.RaftRole
    
        // 当前任期号
        term int
    
        // leader.id
        leaderID string
    
        // 集群配置
        cfg config.IRaftConfig
    }
    
    func newRaftStateBase(term int, cfg config.IRaftConfig) *tRaftStateBase {
        it := new(tRaftStateBase)
        it.init(term, cfg)
        return it
    }
    
    // init initialize self, with term and config specified
    func (me *tRaftStateBase) init(term int, cfg config.IRaftConfig) {
        me.cfg = cfg
        me.role = roles.Follower
        me.term = term
        me.leaderID = ""
    }
    
    func (me *tRaftStateBase) Role() roles.RaftRole {
        return me.role
    }
    

    lsm/tFollowerState

    follower状态的实现,未完成

    package lsm
    
    import (
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/timeout"
        "sync"
        "time"
    )
    
    type tFollowerState struct {
        tRaftStateBase
    
        mInitOnce  sync.Once
        mStartOnce sync.Once
        mEventMap  map[tFollowerEvent][]tFollowerEventHandler
    }
    
    type tFollowerEvent int
    
    const evStart tFollowerEvent = 1
    
    type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})
    
    func newFollowerState(term int, cfg config.IRaftConfig) *tFollowerState {
        it := new(tFollowerState)
        it.init(term, cfg)
    
        // todo: to implement IRaftState
        return it
    }
    
    func (me *tFollowerState) init(term int, cfg config.IRaftConfig) {
        me.mInitOnce.Do(func() {
            me.tRaftStateBase = *newRaftStateBase(term, cfg)
    
            // init event map
            me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
            me.registerEventHandlers()
        })
    }
    
    func (me *tFollowerState) registerEventHandlers() {
        me.mEventMap[evStart] = []tFollowerEventHandler{
            me.afterStartThenBeginWatchLeaderTimeout,
        }
    }
    
    func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {
        if handlers, ok := me.mEventMap[e]; ok {
            for _, it := range handlers {
                it(e, args...)
            }
        }
    }
    
    func (me *tFollowerState) Start() {
        me.mStartOnce.Do(func() {
            me.raise(evStart)
        })
    }
    
    func (me *tFollowerState) afterStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args ...interface{}) {
        go func() {
            iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
            for range time.Tick(iCheckingTimeoutInterval) {
                // todo: watch leader.AppendEntries rpc timeout
            }
        }()
    }
    

    (未完待续)

    相关文章

      网友评论

        本文标题:手撸golang etcd raft协议之2

        本文链接:https://www.haomeiwen.com/subject/kegmhltx.html