美文网首页超级账本HyperLeder
EtcdRaft源码分析(线性一致读)

EtcdRaft源码分析(线性一致读)

作者: 小蜗牛爬楼梯 | 来源:发表于2020-04-07 16:30 被阅读0次

    背景
    我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。

    想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?

    Leader跟Follower并不总是一致的,换句话说Follower会落后Leader的进度。如果没有特别的处理,那么不同的节点读取的结果很可能不一致。

    如果Leader被集群孤立,而且其他人已经推举出了新的Leader。而老的Leader还没有察觉到这个变化,他任然觉得还是Leader,但是他的数据已经不可信。如果他还在对外提供服务,那么读取的结果很可能不一致。

    EtcdRaft的线性一致读是通过ReadIndex的机制来实现,大致的实现其实很简单,也就是在处理请求之前,会去集群中确认自己权力是否稳固,这样对外提供的服务才够权威。下面我们一起来剖析下Raft是怎么处理的。

    接口
    type Node interface {
    ...
    // ReadIndex request a read state. The read state will be set in the ready.
    // Read state has a read index. Once the application advances further than the read
    // index, any linearizable read requests issued before the read request can be
    // processed safely. The read state will have the same rctx attached.
    ReadIndex(ctx context.Context, rctx []byte) error
    ...
    }
    这个接口很怪,返回error,不是我们通常意义理解的查询接口。Golang的世界就是这么奇妙。注定这个接口没有那么简单。下面我们先搞清楚,这个接口是怎么用的。

    EtcdServer
    EtcdServer是再好不过的例子了。

    func (s *EtcdServer) linearizableReadLoop() {
    var rs raft.ReadState

    for {
    ctxToSend := make([]byte, 8)
    id1 := s.reqIDGen.Next()
    binary.BigEndian.PutUint64(ctxToSend, id1)
    ...
    cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
    if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
    ...
    }
    cancel()

      var (
         timeout bool
         done    bool
      )
      for !timeout && !done {
         select {
         case rs = <-s.r.readStateC:
            done = bytes.Equal(rs.RequestCtx, ctxToSend)
           ...
      }
      ...
    
      if ai := s.getAppliedIndex(); ai < rs.Index {
         select {
         case <-s.applyWait.Wait(rs.Index):
         case <-s.stopping:
            return
         }
      }
      ...
    

    }
    }
    当然这是精简过后的代码,只保留了跟ReadIndex相关的逻辑。从这里我们也能看出一些端倪。

    首先,传入的参数ctxToSend,是一个单调自增的id,没有任何意义,只是用来客户端区分请求只用。
    其次,会从readStateC里面监听发出的ReadIndex请求Raft是否有了回应。
    第三,会看下本地已经写入状态机的日志有没有到ReadIndex请求回来的位置,如果没有继续等待,如果有这个方法立即结束。
    注意没有这个方法整个是个for循环,而且请求id还单调自增,那么执行下来的结果就是会一直保持对Raft状态的监控,一有风吹草动,这边就会提前收到通知。
    ReadIndex
    func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
    return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
    }
    可以看到,最终是通过MsgReadIndex的方式对内进行广播。

    Follower
    case pb.MsgReadIndex:
    if r.lead == None {
    r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
    return nil
    }
    m.To = r.lead
    r.send(m)
    首先破除第一个问题,不管谁接到ReadIndex的请求,将转发给Leader。

    Leader
    case pb.MsgReadIndex:
    if r.quorum() > 1 {
    if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
    // Reject read only request when this leader has not committed any log entry at its term.
    return nil
    }

      // thinking: use an interally defined context instead of the user given context.
      // We can express this in terms of the term and index instead of a user-supplied value.
      // This would allow multiple reads to piggyback on the same message.
      switch r.readOnly.option {
      case ReadOnlySafe:
         r.readOnly.addRequest(r.raftLog.committed, m)
         r.bcastHeartbeatWithCtx(m.Entries[0].Data)
      case ReadOnlyLeaseBased:
         ri := r.raftLog.committed
         if m.From == None || m.From == r.id { // from local member
            r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
         } else {
            r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
         }
      }
    

    } else {
    r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
    }

    return nil
    }
    如果当前Leader截至到现在还没有提交任何entry,那么直接返回。
    下面我们看下不同的ReadOnly选项都是怎么实现的。
    ReadOnlySafe
    func (ro *readOnly) addRequest(index uint64, m pb.Message) {
    ctx := string(m.Entries[0].Data)
    if _, ok := ro.pendingReadIndex[ctx]; ok {
    return
    }
    ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
    ro.readIndexQueue = append(ro.readIndexQueue, ctx)
    }
    首先我们将请求的requestid放入pendingReadIndex中暂存,当然了,如果之前已经请求过了,那么返回,同时会初始化一个readIndexStatus,最后append到readIndexQueue
    注意这里的readIndexQueue是FIFO的,是严格有序的。
    其次,request里面保存的只是一个请求的id,用来客户端来区分请求用的。
    另外一个需要注意的是,这里保存的index是当前Leader的committedindex,也就是最终一致的地方。
    想象一下,客户端那边在不停的往Raft里面灌数据,那么其他客户端在读取Raft数据的时候,怎么知道哪些数据是形成一致的,可以安全拿出来用的。当然committedindex是重要的指标,代表,包括这个index及之前的数据都是安全的,有保障的。
    那么怎么拿到这个committedindex,这就是ReadIndex的目的。
    bcastHeartbeatWithCtx
    r.bcastHeartbeatWithCtx(m.Entries[0].Data)

    func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    // Attach the commit as min(to.matched, r.committed).
    // When the leader sends out heartbeat message,
    // the receiver(follower) might not be matched with the leader
    // or it might not have all the committed entries.
    // The leader MUST NOT forward the follower's commit to
    // an unmatched index.
    commit := min(r.getProgress(to).Match, r.raftLog.committed)
    m := pb.Message{
    To: to,
    Type: pb.MsgHeartbeat,
    Commit: commit,
    Context: ctx,
    }

    r.send(m)
    

    }
    将请求的requestid作为心跳的context,发给其他成员。下面我们看下Follower或Candidate接到请求是怎么处理的。

    Follower&Candidate
    case pb.MsgHeartbeat:
    r.electionElapsed = 0
    r.lead = m.From
    r.handleHeartbeat(m)

    case pb.MsgHeartbeat:
    r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
    r.handleHeartbeat(m)

    func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
    }
    可以看到二位接到心跳后,并没有针对ReadOnly做特殊处理。做了他们自己平时该做的。
    回忆下心跳篇,Leader怎么维持自己的权威,不就是发心跳么?其他成员接受到心跳就认为Leader还存在,还不用重新选举。
    既然ReadOnly的实现是需要大多数人确认我还是不是真正的Leader,那么Leader在接受心跳响应的时候,看收到的有没有过半数不就可以了么?所以心跳是完美的实现这个的载体。
    下面我们看下Leader在处理心跳响应的时候,在做什么
    Leader
    case pb.MsgHeartbeatResp:
    pr.RecentActive = true
    pr.resume()

    // free one slot for the full inflights window to allow progress.
    if pr.State == ProgressStateReplicate && pr.ins.full() {
    pr.ins.freeFirstOne()
    }
    if pr.Match < r.raftLog.lastIndex() {
    r.sendAppend(m.From)
    }

    if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
    return nil
    }

    ackCount := r.readOnly.recvAck(m)
    if ackCount < r.quorum() {
    return nil
    }

    rss := r.readOnly.advance(m)
    for _, rs := range rss {
    req := rs.req
    if req.From == None || req.From == r.id { // from local member
    r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
    } else {
    r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
    }
    }
    跟心跳篇重复的部分,在这里我们不再赘述

    首先到了这里要不你开启了ReadOnlySafe,要不就是消息上下文中没有发现ReadIndex

    recvAck
    ackCount := r.readOnly.recvAck(m)
    if ackCount < r.quorum() {
    return nil
    }

    func (ro *readOnly) recvAck(m pb.Message) int {
    rs, ok := ro.pendingReadIndex[string(m.Context)]
    if !ok {
    return 0
    }

    rs.acks[m.From] = struct{}{}
    // add one to include an ack from local node
    return len(rs.acks) + 1
    }
    努力回忆下在Leader发心跳前做的准备工作,其中之一就是保存一个客户端的请求id到pendingReadIndex。这里再次提取这个request,将这个心跳响应累加到acks里面。
    那Leader怎么知道这是普通的心跳响应还是ReadIndex的心跳响应呢?关键就是Message的context是不是有请求id
    换句话说,这里就是收集因为这次ReadIndex请求发起的心跳,最终有多少人给了回应。
    如果超过一半的人答复了Leader,说明这个Leader是被人承认的,有公信力的。那么继续往下
    advance
    func (ro readOnly) advance(m pb.Message) []readIndexStatus {
    var (
    i int
    found bool
    )

    ctx := string(m.Context)
    rss := []*readIndexStatus{}

    for _, okctx := range ro.readIndexQueue {
    i++
    rs, ok := ro.pendingReadIndex[okctx]
    if !ok {
    panic("cannot find corresponding read state from pending map")
    }
    rss = append(rss, rs)
    if okctx == ctx {
    found = true
    break
    }
    }

    if found {
    ro.readIndexQueue = ro.readIndexQueue[i:]
    for _, rs := range rss {
    delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
    }
    return rss
    }

    return nil
    }
    之前在addRequest的时候往ReadOnly里面写了这次请求的信息,那么这里从里面找出来。
    找到后,将之前的还没来得及处理得请求,一起摘出来,往下处理
    for _, rs := range rss {
    req := rs.req
    if req.From == None || req.From == r.id { // from local member
    r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
    } else {
    r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
    }
    }
    遍历上面返回的请求列表
    如果是发给当前节点的请求,那么将这个ReadState累加在本地
    如果不是,给对方发MsgReadIndexResp
    Follower
    case pb.MsgReadIndexResp:
    if len(m.Entries) != 1 {
    r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
    return nil
    }
    r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
    }
    可以看到,Follower是通过Leader拿到最新的commit,尽管他自己都还没有跟上对方的进度。但是对于外部请求方来说,他并不区分当前是Follower或Leader,他只想知道当前Raft的最新状态。所以将最新得commit累加到本地得ReadState,等待发送出去。

    总结
    事已至此,最新的readstate都已经保存在本地了,那么这些状态怎么发送出去,让应用层处理,那就是Ready在做的事情了。

    作者:Pillar_Zhong
    链接:https://www.jianshu.com/p/076989a2ede2
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

        本文标题:EtcdRaft源码分析(线性一致读)

        本文链接:https://www.haomeiwen.com/subject/qrpnphtx.html