美文网首页超级账本HyperLeder
EtcdRaft源码分析(提交数据)

EtcdRaft源码分析(提交数据)

作者: 小蜗牛爬楼梯 | 来源:发表于2020-04-07 15:39 被阅读0次

    下面我们来看下外部提交数据,Raft是怎么处理的。

    Client

    Note接口

    type Node interface {
        ...
       // Propose proposes that data be appended to the log. Note that proposals can be lost without
       // notice, therefore it is user's job to ensure proposal retries.
       Propose(ctx context.Context, data []byte) error
       ...
    }
    
    

    外部Client提交数据就是调用Propose方法。

    Follower

    case pb.MsgProp:
            if r.lead == None {
                return ErrProposalDropped
            } else if r.disableProposalForwarding {
                return ErrProposalDropped
            }
            m.To = r.lead
            r.send(m)
    
    

    Follower接到Propose请求,可能的话会转发给Leader。

    Candidate

    case pb.MsgProp:
       r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
       return ErrProposalDropped
    
    

    如果是Candidate收到的话,直接丢弃

    Leader

    case pb.MsgProp:
       if len(m.Entries) == 0 {
          r.logger.Panicf("%x stepped empty MsgProp", r.id)
       }
       if _, ok := r.prs[r.id]; !ok {
          // If we are not currently a member of the range (i.e. this node
          // was removed from the configuration while serving as leader),
          // drop any new proposals.
          return ErrProposalDropped
       }
       if r.leadTransferee != None {
          r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
          return ErrProposalDropped
       }
    
       for i, e := range m.Entries {
          if e.Type == pb.EntryConfChange {
             if r.pendingConfIndex > r.raftLog.applied {
                r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
                   e.String(), r.pendingConfIndex, r.raftLog.applied)
                m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
             } else {
                r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
             }
          }
       }
    
       if !r.appendEntry(m.Entries...) {
          return ErrProposalDropped
       }
       r.bcastAppend()
       return nil
    
    
    • 当然了,你不能玩假的,数据不能为空
    • 看成员列表有没有自己,如果没有的话,丢弃。配置变更会把自己在ptr里面删掉么?待分析
    • 权力转移期间,也会丢弃
    • 遍历entries,如果发现里面配置更新的提案,且r.pendingConfIndex > r.raftLog.applied,那将这条新的提案置空,否则将这条配置更新的位置记到pendingConfIndex

    appendEntry

    func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
       li := r.raftLog.lastIndex()
       for i := range es {
          es[i].Term = r.Term
          es[i].Index = li + 1 + uint64(i)
       }
       // Track the size of this uncommitted proposal.
       if !r.increaseUncommittedSize(es) {
          r.logger.Debugf(
             "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
             r.id,
          )
          // Drop the proposal.
          return false
       }
       // use latest "last" index after truncate/append
       li = r.raftLog.append(es...)
       r.getProgress(r.id).maybeUpdate(li)
       // Regardless of maybeCommit's return, our caller will call bcastAppend.
       r.maybeCommit()
       return true
    }
    
    
    • 这些数据进来首先第一件事情要给他们分配Leader现在最新的任期,以及index(根据lastindex)
    • 如果这次提交得数据让uncommittedSize累加起来的size超过阈值,那么这次数据提交也得丢弃
    • append到本地unstable,返回现在最新得lastindex
    • 根据lastindex,maybeUpdate,就是将自己的进度更新,Match=lastindex,Next=lastindex+1,updated=true
    • maybeCommit, 会去收集成员的进度,然后伺机提交,不过刚开始,这里不会有效果

    bcastAppend

    func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
       pr := r.getProgress(to)
       if pr.IsPaused() {
          return false
       }
       m := pb.Message{}
       m.To = to
    
       term, errt := r.raftLog.term(pr.Next - 1)
       ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
       if len(ents) == 0 && !sendIfEmpty {
          return false
       }
    
       if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
          if !pr.RecentActive {
             r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
             return false
          }
    
          m.Type = pb.MsgSnap
          snapshot, err := r.raftLog.snapshot()
          if err != nil {
             if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return false
             }
             panic(err) // TODO(bdarnell)
          }
          if IsEmptySnap(snapshot) {
             panic("need non-empty snapshot")
          }
          m.Snapshot = snapshot
          sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
          r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
             r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
          pr.becomeSnapshot(sindex)
          r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
       } else {
          m.Type = pb.MsgApp
          m.Index = pr.Next - 1
          m.LogTerm = term
          m.Entries = ents
          m.Commit = r.raftLog.committed
          if n := len(m.Entries); n != 0 {
             switch pr.State {
             // optimistically increase the next when in ProgressStateReplicate
             case ProgressStateReplicate:
                last := m.Entries[n-1].Index
                pr.optimisticUpdate(last)
                pr.ins.add(last)
             case ProgressStateProbe:
                pr.pause()
             default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
             }
          }
       }
       r.send(m)
       return true
    }
    
    
    • 拿到对方的Progress,也就是进度。
    • 打包当前节点Next之后的entries
    • 打包当前节点Next-1的(任期,index),作为接收人校验用
    • 将自己committed的情况发给对方
    • 准备发MsgApp消息给对方
    • Snapshot的情况
      • 如果Next-1的任期或之后的entries如果查不到,那肯定就在snapshot里面
      • 拿出当前节点存储的snapshot,有可能在unstable或storage里面
      • 将对方的Progress设为ProgressStateSnapshot,且设置PendingSnapshot为snapshot的index
      • 准备发MsgSnap消息给对方

    总结

    后面的流程请参见EtcdRaft源码分析(日志复制)

    作者:Pillar_Zhong
    链接:https://www.jianshu.com/p/568c3400811e
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

        本文标题:EtcdRaft源码分析(提交数据)

        本文链接:https://www.haomeiwen.com/subject/gqjnphtx.html