美文网首页分布式
Raft在etcd中的实现(一)存储及重要的组件

Raft在etcd中的实现(一)存储及重要的组件

作者: yuan1028 | 来源:发表于2018-03-30 16:44 被阅读139次

    raftexample简介

    本文中raft的实现以etcd中raftexample实现为例。
    raftexample的例子中,启动了一个kv数据库作为raft的state machine,另外启动了一个http数据库来响应客户端的请求。其中proposeC以及commitC,errorC主要用来交互,proposeC为由客户端发送过来的条目,通过httpserver流向共识部分raftNode;commitC为经共识的条目,由raftNode流向kv数据库。

    /*
    github.com/coreos/etcd/contrib/raftexample/main.go
    main()
    */
        var kvs *kvstore
        getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
            //启动raft节点
        commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
           //启动kv数据库作为state machine
        kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
    
        // the key-value http handler will propose updates to raft
        serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
    
    

    raftexample中的存储

    raftexample中的存储分为以下几部分

    • kv数据库, 存储state machine的状态。raftexample例子中是个简单的map。
    • 内存数据中的log,raft共识用到的日志条目,一般是一段较新的日志,可能包含一部分已共识的日志和一些尚未共识的日志条目。由于是内存维护,可以灵活的重写替换。该部分内容中位于commitIndex之前的部分是已共识的部分,内容不会被替换,commitIndex到lastIndex之间的内容是尚未共识的部分,index对应的条目内容可能被替换。该部分在raft.MemoryStorage实现。
    • wal部分,raft共识所用的日志文件。该文件只会追加,不会重写和覆盖。raft共识过程中收到的日志条目,都会记录在wal日志文件中。即可能在wal日志文件中看见同index不同term的日志条目。主要作用是在节点崩溃后可以通过wal部分重新启动。
    • snapshot文件部分,类似于存档点的概念,已经过共识的部分,可以在一定时间或者一点量消息后,生成snapshot,snapshot文件一般保存当前state machine的状态,及集群的相关配置等数据。snapshot可以帮助节点快速启动,以及新节点加入时的快速同步。

    几个重要的组件

    etcd中raft的实现了很好的模块化,其中raft共识模块主要是实现了算法的逻辑,而系统需要用到的存储、通讯等模块都从共识模块中很好的剥离了出来是单独模块的实现。这样做可以很方便对raft的共识模块进行移植,也可以很方便地支持多种底层的存储或者通讯方式。raftexample中的组件大概可以分为以下几部分。

    • 应用层部分:对应其raftNode组件,应用层可以根据自己的需要实现自己的raftNode。
    • 共识部分(github.com/coreos/etcd/raft):对应其raft组件。该部分实现了raft核心算法部分。其逻辑主要在raft结构体实现的方法中,其通过raftLog结构体以及progress结构体,实现raft算法中log部分的管理和节点对集群中其他节点信息的管理。并且通过node结构体提供给了应用层与共识部分沟通的渠道。
    • 通讯部分(github.com/coreos/etcd/rafthttp):对应其Transport组件。使用http协议完成节点间的相互通信。
    • 存储部分(github.com/coreos/etcd/raftsnap、github.com/coreos/etcd/wal):对应其snapshotter组件与wal组件。通过这两个组件分别对快照和日志条目进行持久化的存储。

    raftNode组件:主要处理应用层服务

    raftNode结构体,实际上是应用层的包装,真正的raft共识部分在其中的node(raft.Node)中。RaftNode结构体中可以放应用层需要的一些东西。大致有以下一些东西

    • 协程交互用的通道,包括proposeC,confChangeC,commitC,errorC。
    • raft节点的基本配置,包括id,peers,join,waldir,snapdir。
    • raft节点状态,包括lastIndex,confState,snapshotIndex,appliedIndex等。
    • raft共识部分的组件,包括node其是共识的关键部分,raftStorage是日志条目在内存中存储的实现,wal是日志条目存储在文件中的实现,snapshotter及其相关配置是snapshot处理的相关实现,transport是raft节点间通讯的组件。
    /*
    github.com/coreos/etcd/contrib/raftexample/raft.go
    */
    // A key-value stream backed by raft
    type raftNode struct {
        proposeC    <-chan string            // proposed messages (k,v)
        confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
        commitC     chan<- *string           // entries committed to log (k,v)
        errorC      chan<- error             // errors from raft session
    
        id          int      // client ID for raft session
        peers       []string // raft peer URLs
        join        bool     // node is joining an existing cluster
        waldir      string   // path to WAL directory
        snapdir     string   // path to snapshot directory
        getSnapshot func() ([]byte, error)
        lastIndex   uint64 // index of log at start
    
        confState     raftpb.ConfState
        snapshotIndex uint64                   //snapshotIndex
        appliedIndex  uint64                    //同论文中的appliedIndex,用于记录最新的已提交state machine执行的日志的索引
    
        // raft backing for the commit/error channel
        node        raft.Node                    //真正的共识部分的node
        raftStorage *raft.MemoryStorage          //raft中内存存储日志的部分
        wal         *wal.WAL                     //wal文件部分
    
        snapshotter      *raftsnap.Snapshotter
        snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready
    
        snapCount uint64
        transport *rafthttp.Transport
        stopc     chan struct{} // signals proposal channel closed
        httpstopc chan struct{} // signals http server to shutdown
        httpdonec chan struct{} // signals http server shutdown complete
    }
    

    大致有以下几种函数

    • 启动raft函数:startRaft
    • snapshot相关的函数,主要处理snapshot的触发、将什么内容保存到snapshot、保存snapshot、加载snapshot等事项,包括:maybeTriggerSnapshot,publishSnapshot,saveSnap,loadSnapshot
    • 日志条目相关的函数,主要将条目提交给state machine去执行,包括:publishEntries,entriesToApply
    • 重新加载和重放WAL,包括:openWAL,replayWAL
    • 响应Raft和客户端请求的函数:serverRaft,serveChannels

    node组件,应用层与共识模块的沟通者

    node结构体的主要作用是应用层和共识模块的衔接。将应用层的消息传递给底层共识模块,并将底层共识模块共识后的结果反馈给应用层。结构体的主要成员都是用来作消息传递用处的通道。

    type node struct {
        propc      chan pb.Message
        recvc      chan pb.Message
        confc      chan pb.ConfChange
        confstatec chan pb.ConfState
        readyc     chan Ready
        advancec   chan struct{}
        tickc      chan struct{}
        done       chan struct{}
        stop       chan struct{}
        status     chan chan Status
    
        logger Logger
    }
    

    大致函数有几种:

    1. 启动、新建、停止:StartNode,RestartNode,newNode,Stop。
    2. 发送命令给共识模块的函数:Tick,Campaign,Propose,Step,ProposeConfChange,ApplyConfChange,TransferLeadership。
    3. 反馈给应用层的函数Ready,Advance,,Status,ReportUnreachable,ReadIndex。

    raft组件:共识组件

    raft结构体,共识组件结构体。
    首先放一下论文中需要维护的state来对比

    currentTerm                   //当前任期   
    votedFor                      //当前任期的候选者编号,无则为null
    log[]                         //日志条目
    
    //Volatile state on all servers,所有服务器上维护
    commitIndex             //已知的最高的可被提交的日志条目的索引,初始为0
    lastApplied             //当前已提交给state machine执行的条目的索引,初始为0
    
    //Volatile state on leaders:(Reinitialized after election),只在leader节点上维护
    nextIndex[]          //对于每一台服务器,下一条将要发给该服务器的条目的索引,初始为leader最后一条条目索引+1
    matchIndex[]         //每一个服务器已知的最高的已复制的条目的索引,初始为0
    

    etcd中raft实现中raft结构体内容,可以看出其主要包含一下几部分

    • state的基本项,包括:id,Term(论文中currentTerm),vote(论文中voteFor),raftLog(论文中log[],并且维护了comitIndex和lastApplied),prs及learnerPrs(论文中的matchIndex与nextIndex以及一些其他关于对应节点状态的信息)。
    • 一些其他状态项,包括state用于判别当前节点状态、isLearner判别当前节点是否是learner、lead表示当前leaderid、以及leadTransferee、pendingConfIndex、readStates、checkQuorum、preVote等。
    • 一些其他配置项,包括maxInflight,maxMsgSize,heartbeatTimeout,electionTimeout,randomizedElectionTimeout,disableProposalForwarding等。
    • 重要的函数,tick用来计时,step用来处理各种RPC。
    type raft struct {
        //同论文中的state的一些基本项
        id         uint64               //节点id
        Term       uint64               //节点当前所处任期,即currentTerm
        Vote       uint64               // 即votedFor
        raftLog    *raftLog             //节点的日志,即log[]
        prs        map[uint64]*Progress //progress中存储了对应节点的matchIndex和nextIndex
        learnerPrs map[uint64]*Progress
    
        //一些其他状态
        readStates []ReadState
        state      StateType
        // isLearner is true if the local raft node is a learner.
        isLearner bool
        votes     map[uint64]bool
        msgs      []pb.Message
        readOnly  *readOnly
        lead      uint64 // the leader id
        // leadTransferee is id of the leader transfer target when its value is not zero.
        // Follow the procedure defined in raft thesis 3.10.
        leadTransferee   uint64
        pendingConfIndex uint64
        checkQuorum      bool
        preVote          bool
    
        //一些其他配置项
        maxInflight               int
        maxMsgSize                uint64
        electionElapsed           int
        heartbeatElapsed          int
        heartbeatTimeout          int
        electionTimeout           int
        randomizedElectionTimeout int
        disableProposalForwarding bool
    
        tick   func()
        step   stepFunc //按照leader,follower,candidate角色不同,有三个不同的函数
        logger Logger
    }
    

    raft结构体及其方法是算法中的主体部分。大致有以下几种函数:

    • 新建并启动:newRaft
    • 查询或者获取状态类:包括hasLeader,softState,hardState,quorum,nodes,learnerNodes,getProgress,maybeCommit,pastElectionTimeout,checkQuorumActive
    • 控制类函数:主要作用是根据消息类型调用响应的函数,包括:Step,stepLeader,stepCandidate,stepFollower
    • 发送消息类:包括send,sendAppend,sendHeartbeat,sendTimeoutNow,bcastAppend,bcastHearbeat,bcastHearbeatWithCtx。其中几个bcast打头函数表示向集群中的每个节点发送。
    • 处理消息类:包括handleAppendEntries,handleHeartbeat,handleSnapshot,
    • 处理集群成员变更类:addNode,addLearner,addNodeOrLearnerNode,removeNode,restoreNode,setProgress,delProgress
    • 实际操作类:包括appendEntry,
    • tick函数类:tickElection,tickHeartbeat
    • 角色转化类:becomeFollower,becomeCandidate,becomePreCandidate,becomeLeader
    • 辅助函数类:reset,restore,resetRandomizedElectionTimeout,loadState,campaign,poll

    Progress:维护集群中的节点状态
    Progress结构体,该结构体主要是在每个节点维护集群中其他节点的状态用。主要包括

    • 论文中用以共识用的Match(论文中的matchIndex),next(论文中的nextIndex)。
    • 包括一些节点的是否正常运行的状态RecentActive、Paused。
    • 节点的身份状态State及isLearner等。
    type Progress struct {
        Match, Next     uint64
        State           ProgressStateType
        Paused          bool
        PendingSnapshot uint64
        RecentActive    bool
        ins             *inflights
        IsLearner       bool
    }
    

    大致有以下几种函数:

    • 控制类:pause、resume
    • 状态转变类函数: resetState, becomeProbe, becomeReplicate, becomeSnapshot
    • Match和Next维护类函数:
      maybeUpdate, optimisticUpdate, maybeDecrTo, -
    • 状态查询类函数:IsPaused
    • snapshot相关: snapshotFailure,needSnapshotAbort

    raftLog:raft中的日志管理者
    raftLog结构体,raft中的日志条目管理的结构体,前文中介绍raft中的日志存储包括文件存储和内存存储两部分。该结构体中storage即为内存存储的日志条目部分。unstable是在日志尚未存储到日志文件时的状态。commited和applied是论文中commitIndex和lastApplied的概念,及最新的已共识可提交的日志条目索引,和已提交给state machine执行的最新条目的索引。

    type raftLog struct {
        // storage contains all stable entries since the last snapshot.
        storage Storage
    
        // unstable contains all unstable entries and snapshot.
        // they will be saved into storage.
        unstable unstable
    
        // committed is the highest log position that is known to be in
        // stable storage on a quorum of nodes.
        committed uint64
        // applied is the highest log position that the application has
        // been instructed to apply to its state machine.
        // Invariant: applied <= committed
        applied uint64
    
        logger Logger
    }
    

    大致有以下几种函数

    • 新建: newLog
    • 查询状态类:获取索引、快照或者条目信息等,包括:unstableEntries,
      nextEnts, hasNextEnts, snapshot, firstIndex, lastIndex, lastTerm , entries, allEntries。
    • 状态更新类:更新commited或者applied的值,包括:commitTo,
      appliedTo, stableTo, stableSnapTo 。
    • 条目更新类:append
    • 判断状态类:判断条目是否可以被追加到当前,是否是当前term,找出冲突等。包括:maybeAppend, findConflict, isUpToDate matchTerm, maybeCommit, mustCheckOutOfBounds
    • 实际操作类:
    • 辅助类:term, slice,zeroTermOnErrCompacted

    Snapshotter组件:快照管理者

    type Snapshotter struct {
        dir string
    }
    

    大致包含保存快照SaveSnap,读取快照Read,加载快照Load等。

    WAL组件:日志文件管理者

    WAL结构体,用来管理日志文件。

    type WAL struct {
        dir string // the living directory of the underlay files
        // dirFile is a fd for the wal directory for syncing on Rename
        dirFile *os.File
        metadata []byte           // metadata recorded at the head of each WAL
        state    raftpb.HardState // hardstate recorded at the head of WAL
        start     walpb.Snapshot // snapshot to start reading
        decoder   *decoder       // decoder to decode records
        readClose func() error   // closer for decode reader
    
        mu      sync.Mutex
        enti    uint64   // index of the last entry saved to the wal
        encoder *encoder // encoder to encode records
    
        locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
        fp    *filePipeline
    }
    

    大致有以下几种函数:

    • 文件读取类:Open,OpenAtIndex,OpenForRead,ReadAll,
    • 文件写入类:saveEntry,saveState,Save,SaveSnapshot,saveCrc
    • 其他文件操作类:Create,renameWal,cut,Close

    Transport组件:消息传输者

    Transport组件是负责消息通信的,目前的实现方式是http的实现。

    type Transport struct {
        DialTimeout time.Duration // maximum duration before timing out dial of the request
        // DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
        // a distinct rate limiter is created per every peer (default value: 10 events/sec)
        DialRetryFrequency rate.Limit
    
        TLSInfo transport.TLSInfo // TLS information used when creating connection
    
        ID          types.ID   // local member ID
        URLs        types.URLs // local peer URLs
        ClusterID   types.ID   // raft cluster ID for request validation
        Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
        Snapshotter *raftsnap.Snapshotter
        ServerStats *stats.ServerStats // used to record general transportation statistics
        // used to record transportation statistics with followers when
        // performing as leader in raft protocol
        LeaderStats *stats.LeaderStats
        // ErrorC is used to report detected critical errors, e.g.,
        // the member has been permanently removed from the cluster
        // When an error is received from ErrorC, user should stop raft state
        // machine and thus stop the Transport.
        ErrorC chan error
    
        streamRt   http.RoundTripper // roundTripper used by streams
        pipelineRt http.RoundTripper // roundTripper used by pipelines
    
        mu      sync.RWMutex         // protect the remote and peer map
        remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
        peers   map[types.ID]Peer    // peers map
    
        prober probing.Prober
    }
    

    大致包含以下几种函数:

    • 启动和停止:Start,Stop
    • 消息发送和处理:Handler,Get,Send,SendSnapshot
    • 集群链接的维护:CutPeer,MendPeer,AddRemote,AddPeer,RemovePeer,RemoveAllPeers,UpdatePeer,ActiveSince,ActivePeers

    相关文章

      网友评论

        本文标题:Raft在etcd中的实现(一)存储及重要的组件

        本文链接:https://www.haomeiwen.com/subject/jpricftx.html