美文网首页etcdGoraft
etcd raft模块解析(二)

etcd raft模块解析(二)

作者: Cc_7397 | 来源:发表于2019-06-03 20:25 被阅读4次

    raft 结构

    raft.png
    网络层

    首先etcd最外层有一个网络层,负责与集群其他节点通信或者接受客户端的请求,这里我们主要学习raft模块不详细解读,使用就用网络层来代替。

    node

    node负责raft于网络层的交互,交互使用go的chan

    propc:      make(chan msgWithResult), //接收网络层MsgProp类型消息
    recvc:      make(chan pb.Message), //接收网络层除MsgProp类型以外的消息
    confc:      make(chan pb.ConfChange),//接收EntryConfChange类型消息比如动态添加节点
    confstatec: make(chan pb.ConfState),
    readyc:     make(chan Ready),//向上层返回 ready 
    advancec:   make(chan struct{}),//上层处理往ready后返回给raft的消息
    // make tickc a buffered chan, so raft node can buffer some ticks when the node
    // is busy processing raft messages. Raft node will resume process buffered
    // ticks when it becomes idle.
    tickc:  make(chan struct{}, 128),//管理超时的管道
    done:   make(chan struct{}), 
    stop:   make(chan struct{}),
    status: make(chan chan Status),
    

    可以看到定义来许多管道来交互信息

    raft

    raft主要处理raft算法的实现,和日志的复制。
    raft会存储当前节点的id,任期,投票等。
    其中raftlog用来管理日志,可以看到raftlog下有两个类。
    一个日志的流程是这样的:

    • 客户端发送请求,生成日志会先进入到unstable模块,如名字所表示,它是不安全的,因为日志还没有进行持久化。
    • 将unstable中的日志同步给其他节点,同时交给上层持久化。node也负责和上层通信。
    • 如果日志已经被半数以上的节点复制成功了,那么这部分日志将会被认为提交成功。raft会修改raftlog中的committed记录提交的index。
    • 提交成功后raft会将提交成功的日志返回给上层,上层会应用日志,然后响应给客户端成功,同时raft也会同步给其他节点让他们也应用日志,然后修改自己的applied记录应用的index。

    启动node

    创建node

    创建node之前先会读取持久化这磁盘中的日志,根据是否有日志和是否是一个新集群,有三种情况

    • 没有日志并且是新集群,直接调用startNode
    • 没有日志不是新集群,会先与其他节点通信然后更新配置调用straNode
    • 有日志,说明是节点重启,读取日志调用restartNode
      startNode 和 restartNode其实差别不大,都会构建一个raft的配置
    c := &raft.Config{
            ID:              uint64(id),//当前节点id
            ElectionTick:    cfg.ElectionTicks,//选举用超时时间
            HeartbeatTick:   1,//心跳间隔
            Storage:         s,//就是MemorySorage
            MaxSizePerMsg:   maxSizePerMsg, //每次发消息的最大size
            MaxInflightMsgs: maxInflightMsgs,
            CheckQuorum:     true,
            PreVote:         cfg.PreVote,//上文提到的pre模式
        }
    

    除了Storage参数,其他都是一些配置的属性。在straNode中
    Storage是调用 s = raft.NewMemoryStorage() 来创建的一个空的初始对象。而restartNode中会用Storage加载从日志中读取的数据。来还原服务宕机前的状态。

        s := raft.NewMemoryStorage()
        if snapshot != nil {
            s.ApplySnapshot(*snapshot)
        }
    

    创建raft

    接下来调用raft.StartNode(c, peers)来创建raft对象。c就是上面的raft.Config,peers记录集群中所有节点的id。

    func StartNode(c *Config, peers []Peer) Node {
        r := newRaft(c)
        // become the follower at term 1 and apply initial configuration
        // entries of term 1
        r.becomeFollower(1, None)
        for _, peer := range peers {
            cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
            d, err := cc.Marshal()
            if err != nil {
                panic("unexpected marshal error")
            }
            e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
            r.raftLog.append(e)
        }
        // Mark these initial entries as committed.
        // TODO(bdarnell): These entries are still unstable; do we need to preserve
        // the invariant that committed < unstable?
        r.raftLog.committed = r.raftLog.lastIndex()
    
    • 先调用newRaft创建raft对象,newRaft方法中会先Storage中读取信息。然后创建raft对象,然后判断Storage是不是空,如果是重启节点,那这里Storage就会有数据,然后更新raft的数据。
    if !isHardStateEqual(hs, emptyState) {
            r.loadState(hs)
        }
        if c.Applied > 0 {
            raftlog.appliedTo(c.Applied)
        }
    /-------------------------------------------/
    func (r *raft) loadState(state pb.HardState) {
        if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
            r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
        }
        r.raftLog.committed = state.Commit
        r.Term = state.Term
        r.Vote = state.Vote
    /------------------------------------------/
    func (l *raftLog) appliedTo(i uint64) {
        if i == 0 {
            return
        }
        if l.committed < i || i < l.applied {
            l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
        }
        l.applied = i
    }
    }
    

    r.loadState(hs) 里面会更新raft记录的任期和投票,raftLog中的提交index。
    appliedTo 会一个raftLog的应用index。
    之后会调用r.becomeFollower(1, None)修改节点状态变成follower
    同样其他状态也有相应的函数:

    func (r *raft) becomeFollower(term uint64, lead uint64) 
    func (r *raft) becomeCandidate() 
    func (r *raft) becomePreCandidate() 
    func (r *raft) becomeLeader() 
    

    然后如果是新集群,则会将集群的节点信息追加到日志中,并提交追加的日志,这部分用于重启时能从日志中读取到集群的节点信息。
    最后创建node 开启协程启动node ,创建node没有什么操作就是创建对象。

        n := newNode()
        n.logger = c.Logger
        go n.run(r)
        return &n
    

    run

    func (n *node) run(r *raft) {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var prevLastUnstablei, prevLastUnstablet uint64
    var havePrevLastUnstablei bool
    var prevSnapi uint64
    var applyingToI uint64
    var rd Ready
    
    lead := None
    prevSoftSt := r.softState()
      prevHardSt := emptyState
    
      for {
         if advancec != nil {
            readyc = nil
         } else {
            rd = newReady(r, prevSoftSt, prevHardSt)
            if rd.containsUpdates() {
               readyc = n.readyc
            } else {
               readyc = nil
            }
         }
    
         if lead != r.lead {
            if r.hasLeader() {
               if lead == None {
               propc = n.propc
            } else {
               propc = nil
            }
            lead = r.lead
         }
    
      select {
         case pm := <-propc:
            err := r.Step(m)
         case m := <-n.recvc:
               r.Step(m)
         case cc := <-n.confc:
         case <-n.tickc:
            r.tick()
         case readyc <- rd:
            advancec = n.advancec
         case <-advancec:
            advancec = nil
         case c := <-n.status:
            c <- getStatus(r)
         case <-n.stop:
            close(n.done)
            return
    }
      }
    }
    

    上面的代码是删除来很多逻辑只保留基本结构的。
    主要用于从管道读取消息然后传递给raft。
    这里重点就是,raft是不能主动给发消息的,只能是上层应用自己来拉取。
    第一次进入循环advancec为空,会调用
    newReady(r, prevSoftSt, prevHardSt) 这个函数的返回就是raft的状态变化和要发送的消息。

    rd := Ready{
       Entries:          r.raftLog.unstableEntries(),//unstable中的日志交给上层持久化
     CommittedEntries: r.raftLog.nextEnts(),//已经提交待应用的日志,交给上层应用
     Messages:         r.msgs,//raft要发送的消息
    }
    

    node拿到这个对象后会通过readyc通道发送给上层,然后记录这次的状态已用于下次调用Ready的时候判断状态是否变化。
    之后会等待advancec通道,如果要消息则说明上次的消息已经处理完成,可以修改自己的状态了,比如日志应用完成修改自己的应用index。

    这就是raft启动的过程。下一个文章讲解选举的流程

    相关文章

      网友评论

        本文标题:etcd raft模块解析(二)

        本文链接:https://www.haomeiwen.com/subject/yqjhxctx.html