美文网首页etcdGoraft
etcd raft模块解析(二)

etcd raft模块解析(二)

作者: Cc_7397 | 来源:发表于2019-06-03 20:25 被阅读4次

raft 结构

raft.png
网络层

首先etcd最外层有一个网络层,负责与集群其他节点通信或者接受客户端的请求,这里我们主要学习raft模块不详细解读,使用就用网络层来代替。

node

node负责raft于网络层的交互,交互使用go的chan

propc:      make(chan msgWithResult), //接收网络层MsgProp类型消息
recvc:      make(chan pb.Message), //接收网络层除MsgProp类型以外的消息
confc:      make(chan pb.ConfChange),//接收EntryConfChange类型消息比如动态添加节点
confstatec: make(chan pb.ConfState),
readyc:     make(chan Ready),//向上层返回 ready 
advancec:   make(chan struct{}),//上层处理往ready后返回给raft的消息
// make tickc a buffered chan, so raft node can buffer some ticks when the node
// is busy processing raft messages. Raft node will resume process buffered
// ticks when it becomes idle.
tickc:  make(chan struct{}, 128),//管理超时的管道
done:   make(chan struct{}), 
stop:   make(chan struct{}),
status: make(chan chan Status),

可以看到定义来许多管道来交互信息

raft

raft主要处理raft算法的实现,和日志的复制。
raft会存储当前节点的id,任期,投票等。
其中raftlog用来管理日志,可以看到raftlog下有两个类。
一个日志的流程是这样的:

  • 客户端发送请求,生成日志会先进入到unstable模块,如名字所表示,它是不安全的,因为日志还没有进行持久化。
  • 将unstable中的日志同步给其他节点,同时交给上层持久化。node也负责和上层通信。
  • 如果日志已经被半数以上的节点复制成功了,那么这部分日志将会被认为提交成功。raft会修改raftlog中的committed记录提交的index。
  • 提交成功后raft会将提交成功的日志返回给上层,上层会应用日志,然后响应给客户端成功,同时raft也会同步给其他节点让他们也应用日志,然后修改自己的applied记录应用的index。

启动node

创建node

创建node之前先会读取持久化这磁盘中的日志,根据是否有日志和是否是一个新集群,有三种情况

  • 没有日志并且是新集群,直接调用startNode
  • 没有日志不是新集群,会先与其他节点通信然后更新配置调用straNode
  • 有日志,说明是节点重启,读取日志调用restartNode
    startNode 和 restartNode其实差别不大,都会构建一个raft的配置
c := &raft.Config{
        ID:              uint64(id),//当前节点id
        ElectionTick:    cfg.ElectionTicks,//选举用超时时间
        HeartbeatTick:   1,//心跳间隔
        Storage:         s,//就是MemorySorage
        MaxSizePerMsg:   maxSizePerMsg, //每次发消息的最大size
        MaxInflightMsgs: maxInflightMsgs,
        CheckQuorum:     true,
        PreVote:         cfg.PreVote,//上文提到的pre模式
    }

除了Storage参数,其他都是一些配置的属性。在straNode中
Storage是调用 s = raft.NewMemoryStorage() 来创建的一个空的初始对象。而restartNode中会用Storage加载从日志中读取的数据。来还原服务宕机前的状态。

    s := raft.NewMemoryStorage()
    if snapshot != nil {
        s.ApplySnapshot(*snapshot)
    }

创建raft

接下来调用raft.StartNode(c, peers)来创建raft对象。c就是上面的raft.Config,peers记录集群中所有节点的id。

func StartNode(c *Config, peers []Peer) Node {
    r := newRaft(c)
    // become the follower at term 1 and apply initial configuration
    // entries of term 1
    r.becomeFollower(1, None)
    for _, peer := range peers {
        cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
        d, err := cc.Marshal()
        if err != nil {
            panic("unexpected marshal error")
        }
        e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
        r.raftLog.append(e)
    }
    // Mark these initial entries as committed.
    // TODO(bdarnell): These entries are still unstable; do we need to preserve
    // the invariant that committed < unstable?
    r.raftLog.committed = r.raftLog.lastIndex()
  • 先调用newRaft创建raft对象,newRaft方法中会先Storage中读取信息。然后创建raft对象,然后判断Storage是不是空,如果是重启节点,那这里Storage就会有数据,然后更新raft的数据。
if !isHardStateEqual(hs, emptyState) {
        r.loadState(hs)
    }
    if c.Applied > 0 {
        raftlog.appliedTo(c.Applied)
    }
/-------------------------------------------/
func (r *raft) loadState(state pb.HardState) {
    if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
        r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
    }
    r.raftLog.committed = state.Commit
    r.Term = state.Term
    r.Vote = state.Vote
/------------------------------------------/
func (l *raftLog) appliedTo(i uint64) {
    if i == 0 {
        return
    }
    if l.committed < i || i < l.applied {
        l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
    }
    l.applied = i
}
}

r.loadState(hs) 里面会更新raft记录的任期和投票,raftLog中的提交index。
appliedTo 会一个raftLog的应用index。
之后会调用r.becomeFollower(1, None)修改节点状态变成follower
同样其他状态也有相应的函数:

func (r *raft) becomeFollower(term uint64, lead uint64) 
func (r *raft) becomeCandidate() 
func (r *raft) becomePreCandidate() 
func (r *raft) becomeLeader() 

然后如果是新集群,则会将集群的节点信息追加到日志中,并提交追加的日志,这部分用于重启时能从日志中读取到集群的节点信息。
最后创建node 开启协程启动node ,创建node没有什么操作就是创建对象。

    n := newNode()
    n.logger = c.Logger
    go n.run(r)
    return &n

run

func (n *node) run(r *raft) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

lead := None
prevSoftSt := r.softState()
  prevHardSt := emptyState

  for {
     if advancec != nil {
        readyc = nil
     } else {
        rd = newReady(r, prevSoftSt, prevHardSt)
        if rd.containsUpdates() {
           readyc = n.readyc
        } else {
           readyc = nil
        }
     }

     if lead != r.lead {
        if r.hasLeader() {
           if lead == None {
           propc = n.propc
        } else {
           propc = nil
        }
        lead = r.lead
     }

  select {
     case pm := <-propc:
        err := r.Step(m)
     case m := <-n.recvc:
           r.Step(m)
     case cc := <-n.confc:
     case <-n.tickc:
        r.tick()
     case readyc <- rd:
        advancec = n.advancec
     case <-advancec:
        advancec = nil
     case c := <-n.status:
        c <- getStatus(r)
     case <-n.stop:
        close(n.done)
        return
}
  }
}

上面的代码是删除来很多逻辑只保留基本结构的。
主要用于从管道读取消息然后传递给raft。
这里重点就是,raft是不能主动给发消息的,只能是上层应用自己来拉取。
第一次进入循环advancec为空,会调用
newReady(r, prevSoftSt, prevHardSt) 这个函数的返回就是raft的状态变化和要发送的消息。

rd := Ready{
   Entries:          r.raftLog.unstableEntries(),//unstable中的日志交给上层持久化
 CommittedEntries: r.raftLog.nextEnts(),//已经提交待应用的日志,交给上层应用
 Messages:         r.msgs,//raft要发送的消息
}

node拿到这个对象后会通过readyc通道发送给上层,然后记录这次的状态已用于下次调用Ready的时候判断状态是否变化。
之后会等待advancec通道,如果要消息则说明上次的消息已经处理完成,可以修改自己的状态了,比如日志应用完成修改自己的应用index。

这就是raft启动的过程。下一个文章讲解选举的流程

相关文章

  • etcd raft模块解析(二)

    raft 结构 网络层 首先etcd最外层有一个网络层,负责与集群其他节点通信或者接受客户端的请求,这里我们主要学...

  • etcd raft模块解析(一)

    分布式一致性 选主算法是保证在2n+1数量的集群中可以保证最多n个节点宕机时依然可以保证服务可用,并且在宕机的服务...

  • Etcd常用的术语

    术语描述备注RaftRaft算法,etcd实现一致性的核心etcd有etcd-raft模块FollowerRaft...

  • 手撸golang etcd raft协议之7

    手撸golang etcd raft协议之7 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 ...

  • 手撸golang etcd raft协议之2

    手撸golang etcd raft协议之2 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 ...

  • 手撸golang etcd raft协议之1

    手撸golang etcd raft协议之1 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 ...

  • 手撸golang etcd raft协议之9

    手撸golang etcd raft协议之9 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 ...

  • 手撸golang etcd raft协议之8

    手撸golang etcd raft协议之8 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 ...

  • 手撸golang etcd raft协议之11

    手撸golang etcd raft协议之11 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军...

  • 手撸golang etcd raft协议之10

    手撸golang etcd raft协议之9,10 缘起 最近阅读 [云原生分布式存储基石:etcd深入解析] (...

网友评论

    本文标题:etcd raft模块解析(二)

    本文链接:https://www.haomeiwen.com/subject/yqjhxctx.html