raftexample的例子中,启动了一个kv数据库作为raft的state machine,另外启动了一个http数据库来响应客户端的请求。其中proposeC以及commitC,errorC主要用来交互,proposeC为由客户端发送过来的条目,通过httpserver流向共识部分raftNode;commitC为经共识的条目,由raftNode流向kv数据库。

    var kvs *kvstore
    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
       //启动kv数据库作为state machine
    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

    // the key-value http handler will propose updates to raft
    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)



  • kv数据库, 存储state machine的状态。raftexample例子中是个简单的map。
  • 内存数据中的log,raft共识用到的日志条目,一般是一段较新的日志,可能包含一部分已共识的日志和一些尚未共识的日志条目。由于是内存维护,可以灵活的重写替换。该部分内容中位于commitIndex之前的部分是已共识的部分,内容不会被替换,commitIndex到lastIndex之间的内容是尚未共识的部分,index对应的条目内容可能被替换。该部分在raft.MemoryStorage实现。
  • wal部分,raft共识所用的日志文件。该文件只会追加,不会重写和覆盖。raft共识过程中收到的日志条目,都会记录在wal日志文件中。即可能在wal日志文件中看见同index不同term的日志条目。主要作用是在节点崩溃后可以通过wal部分重新启动。
  • snapshot文件部分,类似于存档点的概念,已经过共识的部分,可以在一定时间或者一点量消息后,生成snapshot,snapshot文件一般保存当前state machine的状态,及集群的相关配置等数据。snapshot可以帮助节点快速启动,以及新节点加入时的快速同步。



  • 应用层部分:对应其raftNode组件,应用层可以根据自己的需要实现自己的raftNode。
  • 共识部分(github.com/coreos/etcd/raft):对应其raft组件。该部分实现了raft核心算法部分。其逻辑主要在raft结构体实现的方法中,其通过raftLog结构体以及progress结构体,实现raft算法中log部分的管理和节点对集群中其他节点信息的管理。并且通过node结构体提供给了应用层与共识部分沟通的渠道。
  • 通讯部分(github.com/coreos/etcd/rafthttp):对应其Transport组件。使用http协议完成节点间的相互通信。
  • 存储部分(github.com/coreos/etcd/raftsnap、github.com/coreos/etcd/wal):对应其snapshotter组件与wal组件。通过这两个组件分别对快照和日志条目进行持久化的存储。



  • 协程交互用的通道,包括proposeC,confChangeC,commitC,errorC。
  • raft节点的基本配置,包括id,peers,join,waldir,snapdir。
  • raft节点状态,包括lastIndex,confState,snapshotIndex,appliedIndex等。
  • raft共识部分的组件,包括node其是共识的关键部分,raftStorage是日志条目在内存中存储的实现,wal是日志条目存储在文件中的实现,snapshotter及其相关配置是snapshot处理的相关实现,transport是raft节点间通讯的组件。
// A key-value stream backed by raft
type raftNode struct {
    proposeC    <-chan string            // proposed messages (k,v)
    confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
    commitC     chan<- *string           // entries committed to log (k,v)
    errorC      chan<- error             // errors from raft session

    id          int      // client ID for raft session
    peers       []string // raft peer URLs
    join        bool     // node is joining an existing cluster
    waldir      string   // path to WAL directory
    snapdir     string   // path to snapshot directory
    getSnapshot func() ([]byte, error)
    lastIndex   uint64 // index of log at start

    confState     raftpb.ConfState
    snapshotIndex uint64                   //snapshotIndex
    appliedIndex  uint64                    //同论文中的appliedIndex,用于记录最新的已提交state machine执行的日志的索引

    // raft backing for the commit/error channel
    node        raft.Node                    //真正的共识部分的node
    raftStorage *raft.MemoryStorage          //raft中内存存储日志的部分
    wal         *wal.WAL                     //wal文件部分

    snapshotter      *raftsnap.Snapshotter
    snapshotterReady chan *raftsnap.Snapshotter // signals when snapshotter is ready

    snapCount uint64
    transport *rafthttp.Transport
    stopc     chan struct{} // signals proposal channel closed
    httpstopc chan struct{} // signals http server to shutdown
    httpdonec chan struct{} // signals http server shutdown complete


  • 启动raft函数:startRaft
  • snapshot相关的函数,主要处理snapshot的触发、将什么内容保存到snapshot、保存snapshot、加载snapshot等事项,包括:maybeTriggerSnapshot,publishSnapshot,saveSnap,loadSnapshot
  • 日志条目相关的函数,主要将条目提交给state machine去执行,包括:publishEntries,entriesToApply
  • 重新加载和重放WAL,包括:openWAL,replayWAL
  • 响应Raft和客户端请求的函数:serverRaft,serveChannels



type node struct {
    propc      chan pb.Message
    recvc      chan pb.Message
    confc      chan pb.ConfChange
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status

    logger Logger


  1. 启动、新建、停止:StartNode,RestartNode,newNode,Stop。
  2. 发送命令给共识模块的函数:Tick,Campaign,Propose,Step,ProposeConfChange,ApplyConfChange,TransferLeadership。
  3. 反馈给应用层的函数Ready,Advance,,Status,ReportUnreachable,ReadIndex。



currentTerm                   //当前任期   
votedFor                      //当前任期的候选者编号,无则为null
log[]                         //日志条目

//Volatile state on all servers,所有服务器上维护
commitIndex             //已知的最高的可被提交的日志条目的索引,初始为0
lastApplied             //当前已提交给state machine执行的条目的索引,初始为0

//Volatile state on leaders:(Reinitialized after election),只在leader节点上维护
nextIndex[]          //对于每一台服务器,下一条将要发给该服务器的条目的索引,初始为leader最后一条条目索引+1
matchIndex[]         //每一个服务器已知的最高的已复制的条目的索引,初始为0


  • state的基本项,包括:id,Term(论文中currentTerm),vote(论文中voteFor),raftLog(论文中log[],并且维护了comitIndex和lastApplied),prs及learnerPrs(论文中的matchIndex与nextIndex以及一些其他关于对应节点状态的信息)。
  • 一些其他状态项,包括state用于判别当前节点状态、isLearner判别当前节点是否是learner、lead表示当前leaderid、以及leadTransferee、pendingConfIndex、readStates、checkQuorum、preVote等。
  • 一些其他配置项,包括maxInflight,maxMsgSize,heartbeatTimeout,electionTimeout,randomizedElectionTimeout,disableProposalForwarding等。
  • 重要的函数,tick用来计时,step用来处理各种RPC。
type raft struct {
    id         uint64               //节点id
    Term       uint64               //节点当前所处任期,即currentTerm
    Vote       uint64               // 即votedFor
    raftLog    *raftLog             //节点的日志,即log[]
    prs        map[uint64]*Progress //progress中存储了对应节点的matchIndex和nextIndex
    learnerPrs map[uint64]*Progress

    readStates []ReadState
    state      StateType
    // isLearner is true if the local raft node is a learner.
    isLearner bool
    votes     map[uint64]bool
    msgs      []pb.Message
    readOnly  *readOnly
    lead      uint64 // the leader id
    // leadTransferee is id of the leader transfer target when its value is not zero.
    // Follow the procedure defined in raft thesis 3.10.
    leadTransferee   uint64
    pendingConfIndex uint64
    checkQuorum      bool
    preVote          bool

    maxInflight               int
    maxMsgSize                uint64
    electionElapsed           int
    heartbeatElapsed          int
    heartbeatTimeout          int
    electionTimeout           int
    randomizedElectionTimeout int
    disableProposalForwarding bool

    tick   func()
    step   stepFunc //按照leader,follower,candidate角色不同,有三个不同的函数
    logger Logger


  • 新建并启动:newRaft
  • 查询或者获取状态类:包括hasLeader,softState,hardState,quorum,nodes,learnerNodes,getProgress,maybeCommit,pastElectionTimeout,checkQuorumActive
  • 控制类函数:主要作用是根据消息类型调用响应的函数,包括:Step,stepLeader,stepCandidate,stepFollower
  • 发送消息类:包括send,sendAppend,sendHeartbeat,sendTimeoutNow,bcastAppend,bcastHearbeat,bcastHearbeatWithCtx。其中几个bcast打头函数表示向集群中的每个节点发送。
  • 处理消息类:包括handleAppendEntries,handleHeartbeat,handleSnapshot,
  • 处理集群成员变更类:addNode,addLearner,addNodeOrLearnerNode,removeNode,restoreNode,setProgress,delProgress
  • 实际操作类:包括appendEntry,
  • tick函数类:tickElection,tickHeartbeat
  • 角色转化类:becomeFollower,becomeCandidate,becomePreCandidate,becomeLeader
  • 辅助函数类:reset,restore,resetRandomizedElectionTimeout,loadState,campaign,poll


  • 论文中用以共识用的Match(论文中的matchIndex),next(论文中的nextIndex)。
  • 包括一些节点的是否正常运行的状态RecentActive、Paused。
  • 节点的身份状态State及isLearner等。
type Progress struct {
    Match, Next     uint64
    State           ProgressStateType
    Paused          bool
    PendingSnapshot uint64
    RecentActive    bool
    ins             *inflights
    IsLearner       bool


  • 控制类:pause、resume
  • 状态转变类函数: resetState, becomeProbe, becomeReplicate, becomeSnapshot
  • Match和Next维护类函数:
    maybeUpdate, optimisticUpdate, maybeDecrTo, -
  • 状态查询类函数:IsPaused
  • snapshot相关: snapshotFailure,needSnapshotAbort

raftLog结构体,raft中的日志条目管理的结构体,前文中介绍raft中的日志存储包括文件存储和内存存储两部分。该结构体中storage即为内存存储的日志条目部分。unstable是在日志尚未存储到日志文件时的状态。commited和applied是论文中commitIndex和lastApplied的概念,及最新的已共识可提交的日志条目索引,和已提交给state machine执行的最新条目的索引。

type raftLog struct {
    // storage contains all stable entries since the last snapshot.
    storage Storage

    // unstable contains all unstable entries and snapshot.
    // they will be saved into storage.
    unstable unstable

    // committed is the highest log position that is known to be in
    // stable storage on a quorum of nodes.
    committed uint64
    // applied is the highest log position that the application has
    // been instructed to apply to its state machine.
    // Invariant: applied <= committed
    applied uint64

    logger Logger


  • 新建: newLog
  • 查询状态类:获取索引、快照或者条目信息等,包括:unstableEntries,
    nextEnts, hasNextEnts, snapshot, firstIndex, lastIndex, lastTerm , entries, allEntries。
  • 状态更新类:更新commited或者applied的值,包括:commitTo,
    appliedTo, stableTo, stableSnapTo 。
  • 条目更新类:append
  • 判断状态类:判断条目是否可以被追加到当前,是否是当前term,找出冲突等。包括:maybeAppend, findConflict, isUpToDate matchTerm, maybeCommit, mustCheckOutOfBounds
  • 实际操作类:
  • 辅助类:term, slice,zeroTermOnErrCompacted


type Snapshotter struct {
    dir string




type WAL struct {
    dir string // the living directory of the underlay files
    // dirFile is a fd for the wal directory for syncing on Rename
    dirFile *os.File
    metadata []byte           // metadata recorded at the head of each WAL
    state    raftpb.HardState // hardstate recorded at the head of WAL
    start     walpb.Snapshot // snapshot to start reading
    decoder   *decoder       // decoder to decode records
    readClose func() error   // closer for decode reader

    mu      sync.Mutex
    enti    uint64   // index of the last entry saved to the wal
    encoder *encoder // encoder to encode records

    locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
    fp    *filePipeline


  • 文件读取类:Open,OpenAtIndex,OpenForRead,ReadAll,
  • 文件写入类:saveEntry,saveState,Save,SaveSnapshot,saveCrc
  • 其他文件操作类:Create,renameWal,cut,Close



type Transport struct {
    DialTimeout time.Duration // maximum duration before timing out dial of the request
    // DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
    // a distinct rate limiter is created per every peer (default value: 10 events/sec)
    DialRetryFrequency rate.Limit

    TLSInfo transport.TLSInfo // TLS information used when creating connection

    ID          types.ID   // local member ID
    URLs        types.URLs // local peer URLs
    ClusterID   types.ID   // raft cluster ID for request validation
    Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
    Snapshotter *raftsnap.Snapshotter
    ServerStats *stats.ServerStats // used to record general transportation statistics
    // used to record transportation statistics with followers when
    // performing as leader in raft protocol
    LeaderStats *stats.LeaderStats
    // ErrorC is used to report detected critical errors, e.g.,
    // the member has been permanently removed from the cluster
    // When an error is received from ErrorC, user should stop raft state
    // machine and thus stop the Transport.
    ErrorC chan error

    streamRt   http.RoundTripper // roundTripper used by streams
    pipelineRt http.RoundTripper // roundTripper used by pipelines

    mu      sync.RWMutex         // protect the remote and peer map
    remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
    peers   map[types.ID]Peer    // peers map

    prober probing.Prober


  • 启动和停止:Start,Stop
  • 消息发送和处理:Handler,Get,Send,SendSnapshot
  • 集群链接的维护:CutPeer,MendPeer,AddRemote,AddPeer,RemovePeer,RemoveAllPeers,UpdatePeer,ActiveSince,ActivePeers



