美文网首页分布式系统
consul 源码解析(一)raft 协议实现

consul 源码解析(一)raft 协议实现

作者: HackerZGZ | 来源:发表于2018-03-04 01:59 被阅读0次

    consul 相信大家已经知道了,在日常的开发以及运维中也会常常听到 consul 这个词,但是不是所有的人都知道它是什么?它在运维中扮演了什么样的角色呢?

    首先,我们来看下 consul 的官网中是怎么形容自己的:

    Service Discovery And Configuration Make Easy

    让服务发现以及配置变得更简单,这个就是 consulmicro service 横行的今天想要为我们解决的问题。

    在微服务的世界中,运维人员变得越来越累了,以往可能 1台物理机部署 3~4 个应用,就可以将提供完整的服务,而且维护也变得极为容易,写脚本就是了。但如今,一旦进入的 micro service 的世界,提供相同的服务可能就需要 20~40 个应用了,如此一来,运维人员的工作量以及压力大大增加,但是却得不到肉眼可见的好处。

    先来说下 consul 为我们提供的四大组件:

    • Service Discovery: 当某个应用可用的时候,可以向 consul 客户端注册自己,或者让 consul 客户端通过配置发现自己,这样,如果有需要这个应用的其他应用就可以通过 consul 快速找到一个可用的应用了。

    • Health Check: consul 客户端提供任意数量的健康检查,包括对应用保持心跳、主机物理资源监控等。健康检查可以被 operator 检测并操作,防止流量进入不健康的主机。

    • KV Store: 应用按需使用 consul 的 KV存储 ,可以用于动态配置、功能标记、协调、领袖选举等,通过客户端的 HTTP 接口可以灵活方便使用。

    • Multi Datacenter: consul 提供开箱即用的多数据中心,这意味着用户不需要担心需要建立额外的抽象层让业务扩展到多个区域。

    我们可以发现,微服务治理 的所有解决方案在 consul 的四大组件下都能得到很好的解决。

    为了深入了解 consul ,我们先来看下几个专有名词:

    • Agent: 代理是 consul 集群中每个成员的基本单位,他们以守护进程的形式存在,代理有 客户端 以及 服务端 两种角色运行。所有的节点都必须运行一个代理,它们相互之间通过 DNS 或者 HTTP 接口保持健康检查并同步数据。

    • Client: 客户端 是代理的其中一种角色,它会将所有的 RPC 请求转发到 服务端 代理,而 客户端 本身是 无状态 的,而且只会占用非常少的资源并消耗少量的 网络带宽 ,建议每个应用节点都运行一个 客户端

    • Server: 服务端 相对而言是一个非常重的代理,它的主要工作包括参与 raft仲裁维护集群状态响应RPC查询 、与其他的数据中心 交换数据、将 查询转发给Leader / 其他数据中心 等。可以说,服务端是 consul 集群中最重要的角色,所以建议将其放置在相对独立的主机上,并且一个集群(数据中心)中至少需要 3 个以上的 服务端 才能保证 最终一致性

    • Datacenter: 数据中心 很好理解,其实就是一个 低延迟高带宽 的私有网络环境,一个稳定的数据中心环境对 raft协议 来说非常重要,否则及其可能出现数据不同步、服务质量下降等情况。

    • Gossip: 一种保证 最终一致性 的分布式协议,常用于 点对点通信 ,模拟人与人之间的交流从而达到理想中的 最终一致性 。而 consul 通过 UDP 使用该协议提供 成员管理失败检测事件广播 等功能。

    • Raft: 是一种保证 强一致性 的分布式协议,consul 使用该协议提供 服务端 们的数据一致性,所以我们会在 consul 源码解析中会重点讲述这个算法是如何被应用的。

    ​​​

    // agent/agent.go
    
    // delegate 定义了代理的公共接口
    type delegate interface {
        Encrypted() bool // 数据是否进行了加密
        GetLANCoordinate() (lib.CoordinateSet, error) // 获取局域网内为 Coordinate 角色的服务端
        Leave() error  // 离开集群
        LANMembers() []serf.Member // 局域网内的所有成员
        LANMemberAllSegments() ([]serf.Member, error) // 局域网内所有段区的成员
        LANSegmentMembers(segment string) ([]serf.Member, error) // 局域网内某个段区的所有成员
        LocalMember() serf.Member // 本机成员
        JoinLAN(addrs []string) (n int, err error) // 加入一个或多个段区
        RemoveFailedNode(node string) error // 尝试移除某个异常的节点
        RPC(method string, args interface{}, reply interface{}) error // 远程调用
        SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReqlyFn) error // 发起快照存档的远程调用
        Shutdown() error // 关闭代理
        Stats() map[string]map[string]string // 用于获取应用当前状态
    }
    
    // Agent 代理
    type Agent struct {
        // 代理的运行时配置,支持 hot reload
        config *config.RuntimeConfig
        
        // ... 日志相关
        
        // 内存中收集到的应用、主机等状态信息
        MemSink *metrics.InmemSink
        
        // 代理的公共接口,而配置项则决定代理的角色
        delegate delegate
        
        // 本地策略执行的管理者
        acls *aclManager
            
        // 保证策略执行者的权限,可实时更新,覆盖本地配置文件
        tokens *token.Store
        
        // 存储本地节点、应用、心跳的状态,用于反熵
        State *local.State
        
        // 负责维持本地与远端的状态同步
        sync *ae.StateSyncer
        
        // ...各种心跳包(Monitor/HTTP/TCP/TTL/Docker 等)
        
        // 用于接受其他节点发送过来的事件
        eventCh chan serf.UserEvent
        
        // 用环形队列存储接受到的所有事件,用 index 指向下一个插入的节点
        // 使用读写锁保证数据安全,当一个事件被插入时,会通知 group 中所有的订阅者
        eventBuf    []*UserEvent
        eventIndex  int
        eventLock   sync.RWMutex
        eventNotify NotifyGroup
        
        // 重启,并返回通知是否重启成功
        reloadCh chan chan error
        
        // 关闭代理前的操作
        shutdown     bool
        shutdownCh   chan struct{}
        shutdownLock sync.Mutex
        
        // 添加到局域网成功的回调函数
        joinLANNotifier notifier
        
        // 返回重试加入局域网失败的错误信息
        retryJoinCh chan error
        
        // 并发安全的存储当前所有节点的唯一名称,用于 RPC传输
        endpoints     map[string]string
        endpointsLock sync.RWMutex
        
        // ...为代理提供 DNS/HTTP 的API
    
        // 追踪当前代理正在运行的所有监控
        watchPlans []*watch.Plan
    }
    
    // 决定当前启动的是 server 还是 client 的关键代码在于
    func (a *Agent) Start() error {
        // ...
        
        if c.ServerMode {
            server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
            // error handler
            a.delegate = server // 主要差别在这里
        } else {
            client, err := consul.NewClientLogger(consulCfg, a.logger, a.tokens)
            // error handler
            a.delegate = client // 主要差别在这里
        }
        
        a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) }
        
        // ...
    }
    

    因为相对来说, 客户端 是比较轻量的代理,所以我们先来先看 客户端 的结构与实现:

    // agent/consul/client.go
    
    type Client struct {
        config *Config
        
        // 连接 服务端 的连接池,用 TCP 协议
        connPool *pool.ConnPool
        
        // 用于选择和维护客户端用于 RPC 请求的服务端
        routers *router.Manager
        
        // 用于限制从客户端到服务器的 RPC 总数
        rpcLimiter *rate.Limiter
        
        // 负责接送来自同一个数据中心的事件
        eventCh chan serf.Event
        
        logger *log.Logger
        
        // 存储当前数据中心的 serf 集群信息
        serf *serf.Serf
        
        shutdown     bool
        shutdownCh   chan struct{}
        shutdownLock sync.Mutex
    }
    

    而 Client 所实现的 delegate 接口几乎都是直接调用的 serf 提供的接口,如:

    func (c *Client) JoinLAN(addrs []string) (int, error) {
        return c.serf.Join(addrs, true)
    }
    
    func (c *Client) Leave() error {
        c.logger.Printf("[INFO] consul: client starting leave")
    
        // Leave the LAN pool
        if c.serf != nil {
            if err := c.serf.Leave(); err != nil {
                c.logger.Printf("[ERR] consul: Failed to leave LAN Serf cluster: %v", err)
            }
        }
        return nil
    }
    
    func (c *Client) KeyManagerLAN() *serf.KeyManager {
        return c.serf.KeyManager()
    }
    

    Server 的由于涉及到 raft 协议,所以其实现复杂的多 (44 个 fields ),先来看下结构:

    // agent/consul/server.go
    
    type Server struct {
        // 哨兵接口,负责处理xxx TODO
        sentinel sentinel.Evaluator
        
        // 负责策略执行的权限缓存 TODO
        aclAuthCache *acl.Cache
        
        // 负责策略执行的非权限缓存 TODO
        aclCache *aclCache
        
        // 自动更新当前权限的有效性
        autopilotPolicy AutopilotPolicy
        
        // 负责触发移除无用的应用的检查
        autopilotRemoveDeadCh chan struct{}
        
        // 负责停止自动更新权限有效的代码
        autopilotShutdownCh chan struct{}
        
        // 阻塞直至停止自动更新权限
        autopilotWaitGroup sync.WaitGroup
        
        // 存储当前集群的健康状态
        clusterHealth     structs.OperatorHealthReply
        clusterHealthLock sync.RWMutex
        
        config *Config
        
        // 连接其他服务端的连接池
        connPool *pool.ConnPool
        
        // ...
        
        // 保证强一致性的 raft 状态机
        fsm *fsm.FSM
        
        // 在相同数据中心的 consul 节点使用 raft 协议保证操作的强一致性
        raft          *raft.Raft
        raftLayer     *RaftLayer
        raftStore     *raftboltdb.BoltStore // 提供 ACID 的高性能 KV数据库
        raftTransport *raft.NetworkTransport
        raftInmem     *raft.InmemStore
        
        // 通过 setupRaft() 设置,确保server能收到Leader更换通知
        raftNotifyCh <-chan bool
        
        // 判断 Leader 是否准备好提供一致性读取
        readyForConsistentReads int32
        
        // 使用信号通知该服务端需要退出集群,并尝试将 RPC 转发到其他的服务端上。
        leaveCh chan struct{}
        
        // 用于路由公域网的服务端或者由用户定义的段区域
        router *router.Router
        
        // 用于接受进来的请求连接
        Listener net.Listener
        rpcServer *rpc.Server
        rpcTLS *tls.Config
        
        serfLAN *serf.Serf
        
        // 通过段名指引到不同的 serf 集群中
        segmentLAN map[string]*serf.Serf
        
        // 在同一个数据中心中,由服务端组成的 serf 集群
        serfWAN *serf.Serf
        
        // 在当前的数据中心进行服务端追踪,提供 id 与 address 相互转换
        serverLookup *ServerLookup
        
        // 通知广播,让公域网的 serf 实例知道局域网的服务端发生的变化(退出)
        floodLock sync.RWMutex
        floodCh   []chan struct{}
        
        // ...
        
        // 通知需要存储快照,并重新发起 Leader 选举
        reassertLeaderCh chan chan error
        
        // tombstone 算法的 GC 调优参数
        tombstoneGC *state.TombstoneGC
        
        // 
        aclReplicationStatus     structs.ACLReplicationStatus
        aclReplicationStatusLock sync.RWMutex
    }
    

    下面来看看 consul 作为微服务治理的基础架构图是怎么样的:

    image

    毫无疑问,raft 协议是维持整个 consul 生态中最重要的一个协议,下面重点来讲下 raft 协议:

    作为基于 Paxos 的一种变种算法,它简化了状态,通过加入 时间差领导选举 等概念使一致性算法变得更简单、易懂,先来看看其中的专有名词:

    • Leader election: 领导选举 是目前落地的一致性算法不可避免的一个步骤(CASPaxos 好像不需要,不过还没有深入研究),为了达到一致性,必须要已一个共同认可的节点做出所有操作。而 raft 协议的节点都会处于三种状态之一:

      • Leader: 操作的真正处理者,它负责将改动确认生效,并将其同步到其他节点。

      • Follower: 负责将来自 Leader 的改动请求写入本地 Log,返回成功。

      • Candidate: 如果 Leader 发生了故障,没有收到心跳的 Followers 们会进入 Candidate 状态,并开始选主,并保持该状态直到选主结束。

    • Log Replication: 当 Leader 被选举出来以后,所有的写请求都必须要到 Leader 执行,Leader 会将其作为 Log entry 追加到日志中,然后给其他 Follower 发送请求,当绝大部分的 ((N/2)+1)的 Follower replicated 成功后,就代表该写入事件成功了,就会返回结果状态到客户端。

    • Log Compaction: 在实际的应用场景中,因为磁盘空间是有限的,日志不能无限地增长,否则即使系统需要重启也需要耗费大量的时间。所以, raft 会对节点进行 snapshot 操作,执行成功后,会将 snapshot 之前的日志丢弃掉。

    为了更快速理解 raft 算法,我们可以带着3个问题观看 动画

    • Leader 在什么时候,如何进行选举?

    • Log Replication 作用是什么,如何实现?

    • 节点数量发生变化(增加/减少)时如何处理?

    结合 consul 代码来理解下 raft 协议:

    可以看到 raft 协议中,节点分别处于 3种状态

    // raft/state.go
    type RaftState uint32
    
    const (
        Follower RaftState = iota
        
        Candidate
        
        Leader
        
        // 这个状态是 hashicorp/raft 特有的,表示节点下线
        Shutdown
    )
    
    // 获取当前 raft 节点状态
    func (r *raftState) getState() RaftState {
        stateAddr := (*uint32)(&r.state)
        return RaftState(atomic.LoadUint32(stateAddr))
    }
    

    先来看看在 consul 里面是如何创建一个 raft node 的:

    // raft/api.go
    
    // NewRaft 新建一个 raft 节点,传入参数除了 config/transport 以外,
    // 还有 fsm/logs/store/snapshot ,这是为了可以通过传递参数,使崩掉
    // 的节点能够快速恢复
    func NewRaft(config *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
        
        // TODO ...
        r = &Raft{
            // ...
        }
        
        // 默认为 Follower 角色
        r.setState(Follower)
        
        // 该处只应该在测试的时候被调用,
        // 不是必须的主流程,所以单独拎出来
        if conf.StartAsLeader {
            r.setState(Leader)
            r.setLeader(r.localAddr)
        }
        
        r.goFunc(r.run)          // 管理 Server 状态,运行不同的代码
        r.goFunc(r.runFSM)       // 管理状态机
        r.goFunc(r.runSnapshots) // 管理快照
        return r, nil    
    }
    
    // raft/raft.go
    func (r *Raft) run() {
        for {
            select {
            case <-r.shutdownCh: // 会被 close(r.shutdownCh) 触发
                r.setLeader("")  // 不联系 Leader 节点了
                return
            default:
            }
            
            switch r.getState() {
            case Follower:
                r.runFollower()
            case Candidate:
                r.runCandidate()
            case Leader:
                r.runLeader()
            // 问题:为什么没有判断 Shutdown 的状态呢?
            }
        }
    }
    
    // 只有 Follower 状态的节点会运行该段代码
    func (r *Raft) runFollower() {
        // ...
        for {
            select {
            case rpc := <-rpcCh:
                r.processRPC(rpc) // 根据 rpc 的类型进行 附加日志/备份快照 等操作
            // ...
            case v := <-r.verifyCh:
                // Reject any operations since we are not the leader
                v.respond(ErrNotLeader)
            case <-heartbeatTimer: // 重头戏,还记得动画里面的演示吗?
                // 重设一个随机的定时器
                heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
                
                // 获取上一次从 Leader 节点得到联系的时间
                lastContact := r.LastContact()
                if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
                    continue // 还能联系上 Leader ,继续下一次操作
                }
                
                // 从这里开始就发现联系不上 Leader 了,需要重新开始投票!!!
                
                lastLeader := r.Leader() // 先存一下之前的 Leader
                r.setLeader("")          // 离开这个 Leader 的集群
                
                if r.configurations.lastestIndex == 0 { // 没有其他可通信节点了,终止选举!!!
                    // ...
                } else if { // 关于选举权的配置,可以忽略
                    //...
                } else {
                    // Logger
                    r.setState(Candidate)
                    return
                }
                
            case <-r.shutdownCh:
                return
            }
        }
    }
    

    除了代理的正常维护自己的功能,它还必须处理日志,所以这里使用了 有限状态自动机 (FSM)的方式避免了影响正常节点功能。

    拥有相同日志序列的应用必须归结于相同的状态,意味着行为必须是确定性的。

    // raft/fsm.go
    // FSM 的接口定义了一些能够操作日志的有限状态机的行为
    type FSM interface {
        // 提交日志条目,如果该 FSM 在 Leader 节点上运行
        // 将返回一个 logFuture 用于等待直至日志被认为提交成功
        Apply(*Log) interface{}
        
        // 将当前日志进行快照备份,需要注意的是本函数不能与 Apply
        // 同时在多个线程中被调用,但允许在同一线程中并发运行
        Snapshot() (FSMSnapshot, error)
        
        // 将用于从快照中恢复 FSM 状态,一旦该函数被调用,FSM 必须
        // 暂停其他所有调用与状态,直至恢复完毕
        Restore(io.ReadCloser) error
    }
    
    
    func (r *Raft) runFSM() {
        var lastIndex, lastTerm uint64
        
        // commit 提交日志
        commit := func(req *commitTuple) {
            var resp interface{}
            if req.log.Type == LogCommand {
                resp = r.fsm.Apply(req.log) // 提交日志条目
            }
            
            // 更新索引为提交日志的值
            lastIndex = req.log.Index
            lastTerm = req.log.Term
            
            // Leader 节点需要等待日志 commit
            if req.future != nil {
                req.future.response = resp
                req.future.respond(nil)
            }
        }
        
        // restore 恢复快照
        restore := func(req *restoreFuture) {
            meta, source, err := r.snapshots.Open(req.ID) // 读取快照
            // error handle
            
            if err := r.fsm.Restore(source); err != nil {
                // error handle
            }
            source.Close()
            
            // 更新索引为快照中的值
            lastIndex = meta.Index
            lastTerm = meta.Term
            req.respond(nil)
        }
        
        // snapshot 生成快照备份
        snapshot := func(req *reqSnapshotFuture) {
            if lastIndex  == 0 {
                req.respond(ErrNothingNewToSnapshot)
                return
            }
            
            snap, err := r.fsm.Snapshot()
            
            // 返回当前的索引给请求
            req.index = lastIndex
            req.term = lastTerm
            req.snapshot = snap
            req.respond(err)
        }
        
        for {
            select {
            case ptr := <-r.fsmMutateCh:   // 变化通道
                switch req := ptr.(type) {
                case *commitTuple:
                    commit(req)
                case *restoreFuture:
                    restore(req)
                default:
                    // panic here...
                }
                
            case req := <-r.fsmSnapshotCh:  // 快照通道
                snapshot(req)
                
            case <-r.shutdownCh:
                return
            }
        }
    }
    

    到此,基本上 raft 协议的主干实现已经讲解完全了,此外还有如日志、快照的 存储引擎 、请求的 通信协议 等可以留给有兴趣的童鞋自行去读一下。

    相关文章

      网友评论

        本文标题:consul 源码解析(一)raft 协议实现

        本文链接:https://www.haomeiwen.com/subject/azxffftx.html