美文网首页
Hyperledger-Fabric源码分析(orderer-c

Hyperledger-Fabric源码分析(orderer-c

作者: Pillar_Zhong | 来源:发表于2019-03-10 23:42 被阅读0次

    背景

    从区块链的角度来说,kafka的方式是违背初衷的,试问中心化的kafka部署在哪里合适,云?第三方机构?可以说哪都不合适,一个精心包装的去中心化的架构里面却包含了中心化的服务,真是如鲠在喉,不吐不快。好在,Fabric早就意识到了这个问题,很早就在计划要引入raft。一个公开,平等的联盟链体系里,每个企业都能部署自己的排序服务。
    在分布式一致性算法方面raft可以说非常成熟,算法本身非常精妙。想要搞懂这部分实现,还是需要一些背景知识的,强烈建议先去学习下。
    Fabric的这部分主要是用到了etcd的raft库的实现,实际就是raft算法的标准实现,至于网络通讯及存储部分,则留给应用层自己。之后你可以看到Fabric还是做了不少工作,以后如果etcdraft能独立出来,我想更有利于应用接入。

    https://ramcloud.stanford.edu/~ongaro/thesis.pdf

    https://raft.github.io/

    名词解释

    名词 解释
    Term 任期
    Vote 选举投票
    Entry 日志数据条目
    candidate 候选人
    leader 领导者
    follower 跟随者
    commit 提交
    propose 提议

    配置

    Orderer: &OrdererDefaults
      OrdererType: etcdraft
      Addresses:
        - orderer1st-ordererorg:7050
        - orderer2nd-ordererorg:7050
        - orderer3rd-ordererorg:7050
      BatchTimeout: 2s
      BatchSize:
        MaxMessageCount: 500
        AbsoluteMaxBytes: 98 MB
        PreferredMaxBytes: 512 KB
      EtcdRaft:
        Consenters:
          - Host: orderer1st-ordererorg
            Port: 7050
            ClientTLSCert: ...
            ServerTLSCert: ...
          - Host: orderer2nd-ordererorg
            Port: 7050
            ClientTLSCert: ...
            ServerTLSCert: ...
          - Host: orderer3rd-ordererorg
            Port: 7050
            ClientTLSCert: ...
            ServerTLSCert: ...
        Options:
          TickInterval: 100
          ElectionTick: 10
          HeartbeatTick: 1
          MaxInflightMsgs: 256
          MaxSizePerMsg: 1048576
          SnapshotInterval: 500
    

    可以看到Raftnode就是Orderer自己啦,并没有在Orderer上再建立Raft集群的概念,跟kafka还是有区别。

    Raft

    Node

    1552147356624.png

    Raft库有提供Node来与应用层互动。

    名词 解释
    Tick 这个就像是Raft的发条,要每隔一段时间来调度这里,驱动选举和心跳
    Advance 告诉raft,上次推送的ready,我已经处理完毕,准备好处理下一个Ready
    Ready Raft世界的风吹草动会通知这里,这非常重要,后面会讲到
    Step 将收到的消息写入状态机
    ProposeConfChange 提交配置变更
    Propose 提议写入数据到日志中,可能会返回错误。
    Campaign 调用该函数将驱动节点进入候选人状态,将竞争leader。
    ApplyConfChange 应用配置变更
    type Ready struct {
       // The current volatile state of a Node.
       // SoftState will be nil if there is no update.
       // It is not required to consume or store SoftState.
       *SoftState
    
       // The current state of a Node to be saved to stable storage BEFORE
       // Messages are sent.
       // HardState will be equal to empty state if there is no update.
       pb.HardState
    
       // ReadStates can be used for node to serve linearizable read requests locally
       // when its applied index is greater than the index in ReadState.
       // Note that the readState will be returned when raft receives msgReadIndex.
       // The returned is only valid for the request that requested to read.
       ReadStates []ReadState
    
       // Entries specifies entries to be saved to stable storage BEFORE
       // Messages are sent.
       Entries []pb.Entry
    
       // Snapshot specifies the snapshot to be saved to stable storage.
       Snapshot pb.Snapshot
    
       // CommittedEntries specifies entries to be committed to a
       // store/state-machine. These have previously been committed to stable
       // store.
       CommittedEntries []pb.Entry
    
       // Messages specifies outbound messages to be sent AFTER Entries are
       // committed to stable storage.
       // If it contains a MsgSnap message, the application MUST report back to raft
       // when the snapshot has been received or has failed by calling ReportSnapshot.
       Messages []pb.Message
    
       // MustSync indicates whether the HardState and Entries must be synchronously
       // written to disk or if an asynchronous write is permissible.
       MustSync bool
    }
    

    在Raft世界里一切风吹草动差不多都在这里了,应用层要跟raft来互动的话,这里是一切动作的起源。搞懂这些字段的作用是理解实现的关键。

    名词 解释
    SoftState 记录的是当前任期的leader是谁,以及该节点在raft集群的角色,易变的状态不需要保存
    HardState 需要写入持久化存储中,包括:节点当前Term、Vote、Commit
    Entries 在向其他集群发送消息之前需要先写入持久化存储的日志数据
    Snapshot 需要写入持久化存储中的快照数据
    CommittedEntries 需要输入到状态机中的数据,这些数据之前已经被保存到持久化存储中了
    Messages 在entries被写入持久化存储中以后,需要发送出去的数据

    Raft->Orderer

    case rd := <-n.Ready():
       if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
          n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
       }
    
       if !raft.IsEmptySnap(rd.Snapshot) {
          n.chain.snapC <- &rd.Snapshot
       }
    
       // skip empty apply
       if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
          n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
       }
    
       n.Advance()
    
       // TODO(jay_guo) leader can write to disk in parallel with replicating
       // to the followers and them writing to their disks. Check 10.2.1 in thesis
       n.send(rd.Messages)
    

    这里处理了Raft发来的Ready通知。

    1. 首先不管怎么样,只要收到Ready,先把Entries,HardState,Snapshot存储在本地。要注意存下来并不代表会写入状态机,先收下来比较重要,Raft之后会保证哪些是需要应用到状态机的。因为Raft库没有存储支持,所以需要应用进行接管。
    2. 如果含有snapshot快照,通知snapC,这里后面再讲
    3. len(rd.CommittedEntries) != 0 || rd.SoftState != nil,这里说明如果有CommittedEntries或SoftState变更,通知applyC
    4. 全部处理完,Advance,通知Raft处理完毕,可以发下一个Ready了。
    5. 因为Raft库没有网络支持,所以node间的消息交互需要应用进行接管。这个后面再讲。

    存储

    func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
        if err := rs.wal.Save(hardstate, entries); err != nil {
            return err
        }
    
        if !raft.IsEmptySnap(snapshot) {
            if err := rs.saveSnap(snapshot); err != nil {
                return err
            }
    
            if err := rs.ram.ApplySnapshot(snapshot); err != nil {
                if err == raft.ErrSnapOutOfDate {
                    rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
                        snapshot.Metadata.Term, snapshot.Metadata.Index)
                } else {
                    rs.lg.Fatalf("Unexpected programming error: %s", err)
                }
            }
        }
    
        if err := rs.ram.Append(entries); err != nil {
            return err
        }
    
        return nil
    }
    
    1. HardState和Entries写入WAL
    2. Snapshot写入snap
    3. Snapshot和Entries放到MemoryStorage,可以看成是storage的cache层

    快照

    case sn := <-c.snapC:
                if sn.Metadata.Index <= c.appliedIndex {
                    c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
                    break
                }
    
                b := utils.UnmarshalBlockOrPanic(sn.Data)
                c.lastSnapBlockNum = b.Header.Number
                c.confState = sn.Metadata.ConfState
                c.appliedIndex = sn.Metadata.Index
    
                if err := c.catchUp(sn); err != nil {
                        sn.Metadata.Term, sn.Metadata.Index, err)
                }
    
    1. 如果状态机的index比快照还要新,那继续下去没有意义了
    2. 将快照的数据给chain做更新
    3. 注意这里的confState,里面记录了成员列表,以及learner列表。
    4. 基本上收到快照的都是不成器的follower或新来的learner,要努力跟leader保持一致,所以要调用catchUp
    5. 多提一句,learner不参加选举,是因为它落后太多了,为了不扰乱民主程序的正常进行,先靠边站,等你跟我一致了,你再来把。
    func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
       b, err := utils.UnmarshalBlock(snap.Data)
       if err != nil {
          return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
       }
    
       if c.lastBlock.Header.Number >= b.Header.Number {
          c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
          return nil
       }
    
       puller, err := c.createPuller()
       if err != nil {
          return errors.Errorf("failed to create block puller: %s", err)
       }
       defer puller.Close()
    
       var block *common.Block
       next := c.lastBlock.Header.Number + 1
    
       c.logger.Infof("Catching up with snapshot taken at block %d, starting from block %d", b.Header.Number, next)
    
       for next <= b.Header.Number {
          block = puller.PullBlock(next)
          if block == nil {
             return errors.Errorf("failed to fetch block %d from cluster", next)
          }
          if utils.IsConfigBlock(block) {
             c.support.WriteConfigBlock(block, nil)
          } else {
             c.support.WriteBlock(block, nil)
          }
    
          next++
       }
    
       c.lastBlock = block
       c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
       return nil
    }
    
    1. 这里有个技巧是快照是怎么构建的,这里后面会讲到,其实就是保存的那段快照区间的最后一个block。
    2. 这里用到了Puller,这里底层就是对接的Orderer的deliver服务拉取block。
    3. 下面就很明显了,去拉取该区间的block的同时写入本地账本,并更新lastblock标记位。

    applyC

    SoftState
    case app := <-c.applyC:
       if app.soft != nil {
          newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
          if newLeader != soft.Lead {
             c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
             c.Metrics.LeaderChanges.Add(1)
    
             atomic.StoreUint64(&c.lastKnownLeader, newLeader)
    
             if newLeader == c.raftID {
                propC, cancelProp = becomeLeader()
             }
    
             if soft.Lead == c.raftID {
                becomeFollower()
             }
          }
    
        ...
    
          soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}
    
          // notify external observer
          select {
          case c.observeC <- soft:
          default:
          }
       }
    
    1. 收到这个通知,就代表可能变天了,要换领导。
    2. 看下新来的领导任命书跟现在所知的是不是一个人,如果不是,不好意思就是这么现实,开始工作交接。
    3. 看下是不是自己当选,如是becomeLeader。不用怀疑。
    4. 如果上次是本人当选,这次换人的话,那leader职权得立即停止,becomeFollower。
    5. 记录最新得softstate通知observeC,不过当前外部没有人关注这个事情。
    becomeLeader
    becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
            c.Metrics.IsLeader.Set(1)
    
            c.blockInflight = 0
            c.justElected = true
            submitC = nil
            ch := make(chan *common.Block, c.opts.MaxInflightMsgs)
    
            // if there is unfinished ConfChange, we should resume the effort to propose it as
            // new leader, and wait for it to be committed before start serving new requests.
            if cc := c.getInFlightConfChange(); cc != nil {
                // The reason `ProposeConfChange` should be called in go routine is documented in `writeConfigBlock` method.
                go func() {
                    if err := c.Node.ProposeConfChange(context.TODO(), *cc); err != nil {
                        c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
                    }
                }()
    
                c.confChangeInProgress = cc
                c.configInflight = true
            }
    
            // Leader should call Propose in go routine, because this method may be blocked
            // if node is leaderless (this can happen when leader steps down in a heavily
            // loaded network). We need to make sure applyC can still be consumed properly.
            ctx, cancel := context.WithCancel(context.Background())
            go func(ctx context.Context, ch <-chan *common.Block) {
                for {
                    select {
                    case b := <-ch:
                        data := utils.MarshalOrPanic(b)
                        if err := c.Node.Propose(ctx, data); err != nil {
                            c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                            return
                        }
                        c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)
    
                    case <-ctx.Done():
                        c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                        return
                    }
                }
            }(ctx, ch)
    
            return ch, cancel
        }
    

    其实最主要就是后面的函数,外部调用propC, cancelProp = becomeLeader(),会循环监听propC通道,然后将data用Node.Propose发给Raft,这个后面再讲。

    再Raft的世界里,leader就是王道,它的话就是圣旨,只有leader才有资格Propose东西出去。所以选上的最重要的事情就是拿到与Raft的沟通权力。

    如果有配置未提交,c.Node.ProposeConfChange

    这里什么地方会通知到ch,这里卖个关子,后面会讲到。

    becomeFollower
    becomeFollower := func() {
       cancelProp()
       c.blockInflight = 0
       _ = c.support.BlockCutter().Cut()
       stop()
       submitC = c.submitC
       bc = nil
       c.Metrics.IsLeader.Set(0)
    }
    

    交出权力的心情是痛苦的,我们看下做了什么?

    1. 原来以为cancelProp会断开与propC的关系,现在看来do nothing,一是从理论上来说,follower不会有propose的机会,二是给最后一次的超时补偿做准备。

    2. blockInflight是代表说leader会记录propose出去的block,是不是在Raft里面形成了大多数一致,如果达成一致,leader会在本地commit,这个时候才会移除掉这条记录。

    3. c.support.BlockCutter().Cut(), 这里有个疑问,这种调法会清理掉pendingBatch,真的这么肯定到这里不会剩下没有处理完的么?

      func (r *receiver) Cut() []*cb.Envelope {
         r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
         r.PendingBatchStartTime = time.Time{}
         batch := r.pendingBatch
         r.pendingBatch = nil
         r.pendingBatchSizeBytes = 0
         return batch
      }
      
    4. stop,既然上面已经清掉了pending,那这里再stop pendingbatch的超时处理,也就没有什么问题。

    5. submitC通道是代表接受客户端的数据提交,这个后面再讲。

    6. bc就是blockcreator,里面保存的最近一次创建block的信息,既然你都卸任了,这些也就没什么意义了。

    CommittedEntries
    func (c *Chain) apply(ents []raftpb.Entry) {
        if len(ents) == 0 {
            return
        }
    
        if ents[0].Index > c.appliedIndex+1 {
            c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
        }
    
        var appliedb uint64
        var position int
        for i := range ents {
            switch ents[i].Type {
            case raftpb.EntryNormal:
                if len(ents[i].Data) == 0 {
                    break
                }
    
                // We need to strictly avoid re-applying normal entries,
                // otherwise we are writing the same block twice.
                if ents[i].Index <= c.appliedIndex {
                    c.logger.Debugf("Received block with raft index (%d) <= applied index (%d), skip", ents[i].Index, c.appliedIndex)
                    break
                }
    
                block := utils.UnmarshalBlockOrPanic(ents[i].Data)
                c.writeBlock(block, ents[i].Index)
    
                appliedb = block.Header.Number
                c.Metrics.CommittedBlockNumber.Set(float64(appliedb))
                position = i
                c.accDataSize += uint32(len(ents[i].Data))
    
            ...
    
        if c.accDataSize >= c.sizeLimit {
            select {
            case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
                c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
                    "taking snapshot at block %d, last snapshotted block number is %d",
                    c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
                c.accDataSize = 0
                c.lastSnapBlockNum = appliedb
                c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
            default:
                c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
            }
        }
    
        return
    }
    

    再Raft的世界里面,确认提交的有两种entry啦,一种就是所谓的普通,一种就是配置变更。

    1. 遍历普通日志,如果是已经写入状态机,也就是写入本地账本的block, 那当然要拒绝,免得重复。
    2. 接下就是writeblock到本地啦。
    3. 然后记录这次处理到第几个了,最后再统计这次总共处理的datasize,就是blocksize累加啦。这个之后会有妙用,后面再讲。

    下面我们看下writeBlock的逻辑

    writeBlock
    func (c *Chain) writeBlock(block *common.Block, index uint64) {
       if c.blockInflight > 0 {a
          c.blockInflight-- // only reduce on leader
       }
       c.lastBlock = block
    
       c.logger.Debugf("Writing block %d to ledger", block.Header.Number)
    
       if utils.IsConfigBlock(block) {
          c.writeConfigBlock(block, index)
          return
       }
    
       c.raftMetadataLock.Lock()
       c.opts.RaftMetadata.RaftIndex = index
       m := utils.MarshalOrPanic(c.opts.RaftMetadata)
       c.raftMetadataLock.Unlock()
    
       c.support.WriteBlock(block, m)
    }
    
    1. blockInflight前面讲过了,这里收到代表我发出去的propose收到了群众的强烈支持,那这个提案就过了。剩下就是好好把提案落地就好。
    2. 配置部分有机会单独讲,本身不影响主要的流程,这里先跳过
    3. c.support.WriteBlock(block, m),就是写本地账本啦。
    if c.accDataSize >= c.sizeLimit {
       select {
       case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
          c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
             "taking snapshot at block %d, last snapshotted block number is %d",
             c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
          c.accDataSize = 0
          c.lastSnapBlockNum = appliedb
          c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
       default:
          c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
       }
    }
    
    func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte) error {
        rs.lg.Debugf("Creating snapshot at index %d from MemoryStorage", i)
        snap, err := rs.ram.CreateSnapshot(i, &cs, data)
        if err != nil {
            return errors.Errorf("failed to create snapshot from MemoryStorage: %s", err)
        }
    
        if err = rs.saveSnap(snap); err != nil {
            return err
        }
    
        rs.snapshotIndex = append(rs.snapshotIndex, snap.Metadata.Index)
    
        // Keep some entries in memory for slow followers to catchup
        if i > rs.SnapshotCatchUpEntries {
            compacti := i - rs.SnapshotCatchUpEntries
            rs.lg.Debugf("Purging in-memory raft entries prior to %d", compacti)
            if err = rs.ram.Compact(compacti); err != nil {
                if err == raft.ErrCompacted {
                    rs.lg.Warnf("Raft entries prior to %d are already purged", compacti)
                } else {
                    rs.lg.Fatalf("Failed to purge raft entries: %s", err)
                }
            }
        }
    
        rs.lg.Infof("Snapshot is taken at index %d", i)
    
        rs.gc()
        return nil
    }
    
    1. 如果累加的accDataSize超过阈值,这里会将写入的最后一个block的相关信息通知给gcC通道。
    2. gcC再转调takeSnapshot
    3. TakeSnapshot里面很简单,就是生成snapshot,包括任期,最后一次的日志下标以及block。保存到本地snap。
    4. rs.gc里面涉及到一个阈值,MaxSnapshotFiles,如果超过,需要清理文件。首当其冲的是wal,看下是不是有比快照还要老的日志,有的话清掉。既然都有快照了,wal日志也就没有存在的意义了。Raft的世界里index是一切衡量的基础。snap文件就简单,超过多少就删多少。
    if c.justElected {
       msgInflight := c.Node.lastIndex() > c.appliedIndex
       if msgInflight {
          c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
          continue
       }
    
       if c.configInflight {
          c.logger.Debugf("There is config block in flight, new leader should not serve requests")
          continue
       }
    
       c.logger.Infof("Start accepting requests as Raft leader at block %d", c.lastBlock.Header.Number)
       bc = &blockCreator{
          hash:   c.lastBlock.Header.Hash(),
          number: c.lastBlock.Header.Number,
          logger: c.logger,
       }
       submitC = c.submitC
       c.justElected = false
    } else if c.configInflight {
       c.logger.Info("Config block or ConfChange in flight, pause accepting transaction")
       submitC = nil
    } else if c.blockInflight < c.opts.MaxInflightMsgs {
       submitC = c.submitC
    }
    
    1. justElected就代表刚选上那会。过程自己体会。
    2. msgInflight就代表有MemoryStorage的entry还没有写入账本啦,不宜出门接客
    3. configInflight也是一样,有Raft配置变更或config block进来还没有生效前,更加不宜出门接客
    4. 如果前面都过了,submitC = c.submitC就代表结果submitC通道,正式开始开门迎客。需要注意的是之后可进不到这里哦。
    5. 如果之前有过配置变更的干扰,c.blockInflight < c.opts.MaxInflightMsgs这里就是给她重新出门接客的机会。
    6. 还记不记得becomeFollower的时候立马就能接客,而leader条件很多,说明leader要求高嘛。

    到这里基本把Raft到Orderer的处理都讲完了。

    Messages

       n.Advance()
    
       // TODO(jay_guo) leader can write to disk in parallel with replicating
       // to the followers and them writing to their disks. Check 10.2.1 in thesis
       n.send(rd.Messages)
    

    Advance的意思是这波Ready我已经处理完了,我准备好再处理

    前面提到过,EtcdRaft只关注算法本身,集群节点间怎么通讯,不是它关注的点,不过当然了,消息要发给谁,它是知道的,只不过想让你代劳而已。

    func (n *node) send(msgs []raftpb.Message) {
       n.unreachableLock.RLock()
       defer n.unreachableLock.RUnlock()
    
       for _, msg := range msgs {
          if msg.To == 0 {
             continue
          }
    
          status := raft.SnapshotFinish
    
          msgBytes := utils.MarshalOrPanic(&msg)
          err := n.rpc.SendConsensus(msg.To, &orderer.ConsensusRequest{Channel: n.chainID, Payload: msgBytes})
          if err != nil {
             // TODO We should call ReportUnreachable if message delivery fails
             n.logSendFailure(msg.To, err)
    
             status = raft.SnapshotFailure
          } else if _, ok := n.unreachable[msg.To]; ok {
             n.logger.Infof("Successfully sent StepRequest to %d after failed attempt(s)", msg.To)
             delete(n.unreachable, msg.To)
          }
    
          if msg.Type == raftpb.MsgSnap {
             n.ReportSnapshot(msg.To, status)
          }
       }
    }
    

    还记得之前将snap写入存储吧?到这里一般的情况可以将状态置为SnapshotFinish,但是保险起见,这波消息只要发送失败就认为这次快照存储失败,情愿重发一次。

    最后ReportSnapshot就是用来向Leader报告你发给我的快照的执行情况。

    Orderer->Raft

    Orderer是怎么把消息发给Raft的呢?Fabric剥离了底层共识算法与Orderer的耦合,让替换成为可能。看过之前kafka和solo的对这个应该很熟悉。

    type Consenter interface {
       // Order accepts a message or returns an error indicating the cause of failure
       // It ultimately passes through to the consensus.Chain interface
       Order(env *cb.Envelope, configSeq uint64) error
    
       // Configure accepts a reconfiguration or returns an error indicating the cause of failure
       // It ultimately passes through to the consensus.Chain interface
       Configure(config *cb.Envelope, configSeq uint64) error
    
       // WaitReady blocks waiting for consenter to be ready for accepting new messages.
       // This is useful when consenter needs to temporarily block ingress messages so
       // that in-flight messages can be consumed. It could return error if consenter is
       // in erroneous states. If this blocking behavior is not desired, consenter could
       // simply return nil.
       WaitReady() error
    }
    

    只要是普通类型的事件都会走Order,来push到后端的共识服务。

    func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
       return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
    }
    

    这里封装成SubmitRequest继续往后传递

    func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
       if err := c.isRunning(); err != nil {
          c.Metrics.ProposalFailures.Add(1)
          return err
       }
    
       leadC := make(chan uint64, 1)
       select {
       case c.submitC <- &submit{req, leadC}:
          lead := <-leadC
          if lead == raft.None {
             c.Metrics.ProposalFailures.Add(1)
             return errors.Errorf("no Raft leader")
          }
    
          if lead != c.raftID {
             if err := c.rpc.SendSubmit(lead, req); err != nil {
                c.Metrics.ProposalFailures.Add(1)
                return err
             }
          }
    
       case <-c.doneC:
          c.Metrics.ProposalFailures.Add(1)
          return errors.Errorf("chain is stopped")
       }
    
       return nil
    }
    
    1. 当然chain要是running状态,将收到的事件通知给submitC通道。

    2. 等待leadC的通知,先中断下,看下要干嘛,其实就是返回当前任期的leader,当前任期不准确,应该是最新一任leader,因为有可能在某个任期leader没有选出来,当然这个几率非常非常低,因为有随机超时的存在。

    3. 继续,拿到任命书,看下如果是raft.None, 说明现在还没有领导,直接return,表示这个事件我不能收,收下真的处理不了。

    4. 如果leader不是本人,那问题大了,在Raft的世界里只有leader才能发号施令,现在这个事件怎么办?丢掉又可惜,因为如果都发给leader,那那边压力太大了。既然你没有篡位之意,那努力给你的领导分忧不是,借助rpc将这个事件发给他好了。RPC模块是给orderer间通讯用的,也就是Raftnode间通讯用的。没它整个体系你玩不转的,以后有机会再讲把。

    5. 前面Raft->Orderer章节,我们讲了用submitC来通知开门迎客,下面我们看下接客会做些什么?

    submitC

    case s := <-submitC:
       if s == nil {
          // polled by `WaitReady`
          continue
       }
    
       if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
          s.leader <- raft.None
          continue
       }
    
       s.leader <- soft.Lead
       if soft.Lead != c.raftID {
          continue
       }
    
       batches, pending, err := c.ordered(s.req)
       if err != nil {
          c.logger.Errorf("Failed to order message: %s", err)
       }
       if pending {
          start() // no-op if timer is already started
       } else {
          stop()
       }
    
       c.propose(propC, bc, batches...)
    
       if c.configInflight {
          c.logger.Info("Received config block, pause accepting transaction till it is committed")
          submitC = nil
       } else if c.blockInflight >= c.opts.MaxInflightMsgs {
          c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
             c.blockInflight, c.opts.MaxInflightMsgs)
          submitC = nil
       }
    
    1. 如果当前节点的状态是准候选人或候选人,那就没什么好说了,leader现在还没有产生
    2. 如果soft.Lead != c.raftID,说明什么,说明最新任期不是自己哦,没有propose的权利,丢弃这次请求。
    3. batches, pending, err := c.ordered(s.req),很熟悉了,这里负责出包。
    4. 如果还有剩下的事件没有出包,为了保证不浪费,启动计时器,来做补偿,这部分后面再讲。
    5. c.propose(propC, bc, batches...),重点,这里是真正给Raft发状态的地方,后面讲到。
    6. 最后无非就是一些异常情况,会让leader失去接受请求的能力。
    func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
       for _, batch := range batches {
          b := bc.createNextBlock(batch)
          c.logger.Debugf("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
    
          select {
          case ch <- b:
          default:
             c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
          }
    
          // if it is config block, then we should wait for the commit of the block
          if utils.IsConfigBlock(b) {
             c.configInflight = true
          }
    
          c.blockInflight++
       }
    
       return
    }
    

    还记不记得前面讲becomeLeader的时候提到的ch,这里最后会一个接一个的将block通知到ch。再贴一遍那边的代码。

    go func(ctx context.Context, ch <-chan *common.Block) {
       for {
          select {
          case b := <-ch:
             data := utils.MarshalOrPanic(b)
             if err := c.Node.Propose(ctx, data); err != nil {
                c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                return
             }
             c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)
    
          case <-ctx.Done():
             c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
             return
          }
       }
    }(ctx, ch)
    

    最终会调用c.Node.Propose(ctx, data)的方法。

    Propose的意思就是将日志广播出去,要群众都尽量保存起来,但还没有提交,等到leader收到半数以上的群众都响应说已经保存完了,leader这时就可以提交了,下一次Ready的时候就会带上committedindex。

    超时处理

    case <-timer.C():
       ticking = false
    
       batch := c.support.BlockCutter().Cut()
       if len(batch) == 0 {
          c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
          continue
       }
    
       c.logger.Debugf("Batch timer expired, creating block")
       c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
    

    没有新意,无非就是将pending的做一次Cut,然后propose到Raft。

    配置更新

    配置部分是Raft不可忽略的一部分,Fabric是怎样将成员的变更传递给Raft的?

    首先我们回到Node接口,看下ProposeConfChange和ApplyConfChange。

    一个是通知Raft,有配置变更。另外一个是接到Raft通知,有配置更新,立即执行。

    ProposeConfChange

    func (c *Chain) writeBlock(block *common.Block, index uint64) {
      ...
       if utils.IsConfigBlock(block) {
          c.writeConfigBlock(block, index)
          return
       }
       ...
    }
    

    还记得么,前面提到的writeBlock里面会判断当前写入的是不是configblock

    func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
       metadata, raftMetadata := c.newRaftMetadata(block)
    
       var changes *MembershipChanges
       if metadata != nil {
          changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
       }
    
       confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
       raftMetadata.RaftIndex = index
    
       raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
       // write block with metadata
       c.support.WriteConfigBlock(block, raftMetadataBytes)
       c.configInflight = false
    
       // update membership
       if confChange != nil {
          // We need to propose conf change in a go routine, because it may be blocked if raft node
          // becomes leaderless, and we should not block `serveRequest` so it can keep consuming applyC,
          // otherwise we have a deadlock.
          go func() {
             // ProposeConfChange returns error only if node being stopped.
             // This proposal is dropped by followers because DisableProposalForwarding is enabled.
             if err := c.Node.ProposeConfChange(context.TODO(), *confChange); err != nil {
                c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
             }
          }()
    
          c.confChangeInProgress = confChange
    
          switch confChange.Type {
          case raftpb.ConfChangeAddNode:
             c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", confChange.NodeID)
          case raftpb.ConfChangeRemoveNode:
             c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", confChange.NodeID)
          default:
             c.logger.Panic("Programming error, encountered unsupported raft config change")
          }
    
          c.configInflight = true
       }
    
       c.raftMetadataLock.Lock()
       c.opts.RaftMetadata = raftMetadata
       c.raftMetadataLock.Unlock()
    }
    

    第一眼就可以得出一个结论,在Fabric的世界里,Raft的配置更新是包括在ConfigBlock里面的,只不过在block写入账本之前会从里面剥离出来涉及到Raft的配置变更的部分,然后去通知Raft。

    1. 去ConfigBlock里面拿到EtcdRaft部分的配置,以及当前在用的配置
    2. 做比对,得出一份报告,这次新增了哪几个节点,要删除哪几个几点
    3. 得出报告还不行,还得结合Raft的规定,出局一份书面的申请。UpdateRaftMetadataAndConfChange就是干这个的。
    func (mc *MembershipChanges) UpdateRaftMetadataAndConfChange(raftMetadata *etcdraft.RaftMetadata) *raftpb.ConfChange {
       if mc == nil || mc.TotalChanges == 0 {
          return nil
       }
    
       var confChange *raftpb.ConfChange
    
       // producing corresponding raft configuration changes
       if len(mc.AddedNodes) > 0 {
          nodeID := raftMetadata.NextConsenterId
          raftMetadata.Consenters[nodeID] = mc.AddedNodes[0]
          raftMetadata.NextConsenterId++
          confChange = &raftpb.ConfChange{
             ID:     raftMetadata.ConfChangeCounts,
             NodeID: nodeID,
             Type:   raftpb.ConfChangeAddNode,
          }
          raftMetadata.ConfChangeCounts++
          return confChange
       }
    
       if len(mc.RemovedNodes) > 0 {
          for _, c := range mc.RemovedNodes {
             for nodeID, node := range raftMetadata.Consenters {
                if bytes.Equal(c.ClientTlsCert, node.ClientTlsCert) {
                   delete(raftMetadata.Consenters, nodeID)
                   confChange = &raftpb.ConfChange{
                      ID:     raftMetadata.ConfChangeCounts,
                      NodeID: nodeID,
                      Type:   raftpb.ConfChangeRemoveNode,
                   }
                   raftMetadata.ConfChangeCounts++
                   break
                }
             }
          }
       }
    
       return confChange
    }
    
    1. 有没有发现,这里执行下来每次只会更新一个节点,意味着每次更新Raft成员信息的时候,每次只能新增或删除一个节点,否则剩下的是不会生效的。这里感兴趣的可以参考Raft论文,每次只变更一个节点,是性价比高的实现。

      if len(confState.Nodes) == len(c.opts.RaftMetadata.Consenters) {
         // since configuration change could only add one node or
         // remove one node at a time, if raft nodes state size
         // equal to membership stored in block metadata field,
         // that means everything is in sync and no need to propose
         // update
         return nil
      }
      
    2. 写入ConfigBlock到本地账本

    3. c.Node.ProposeConfChange,这里就是通知Raft做配置更新了。

    4. 设置configInflight=true,表示现在有个配置更新已经提案给Raft了,等通知。

    5. 记录本次更新到confChangeInProgress用来之后的跟踪进度

    ApplyConfChange

    回忆下,当我们的提案发到Raft后,我怎么知道成员都达成一致准备开干呢?当然是等待Ready的CommittedEntries啦,最终会通知applyc通道。

    case raftpb.EntryConfChange:
       var cc raftpb.ConfChange
       if err := cc.Unmarshal(ents[i].Data); err != nil {
          c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
          continue
       }
    
       c.confState = *c.Node.ApplyConfChange(cc)
    
       switch cc.Type {
       case raftpb.ConfChangeAddNode:
          c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
       case raftpb.ConfChangeRemoveNode:
          c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
       default:
          c.logger.Panic("Programming error, encountered unsupported raft config change")
       }
    
       // This ConfChange was introduced by a previously committed config block,
       // we can now unblock submitC to accept envelopes.
       if c.confChangeInProgress != nil &&
          c.confChangeInProgress.NodeID == cc.NodeID &&
          c.confChangeInProgress.Type == cc.Type {
    
          if err := c.configureComm(); err != nil {
             c.logger.Panicf("Failed to configure communication: %s", err)
          }
    
          c.confChangeInProgress = nil
          c.configInflight = false
          // report the new cluster size
          c.Metrics.ClusterSize.Set(float64(len(c.opts.RaftMetadata.Consenters)))
       }
    
       if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
          c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
          // calling goroutine, since otherwise it will be blocked
          // trying to write into haltC
          go c.Halt()
       }
    }
    
    1. c.Node.ApplyConfChange(cc),总算是看到了,这里就是执行配置更新了。
    2. 还记得上面会去记录confChangeInProgress么?如果相等说明之前给Raft的提案,终于收到了响应,大家都准备好了,开始吧。
    3. c.configureComm()会在cluster章节讲解,这里简单的说就是按照最新的成员,构建Raft网络。
    4. 释放configInflight和confChangeInProgress,代表本次配置更新完毕。
    5. 如果接收到的是删除节点的通知,看下是不是本人,如果是,调用Halt,想也知道,最终会去调Node的Stop,停掉该Raft节点。

    最后

    关于通讯层也是很重要的部分,这里不光是托管Raft的消息传递,也是支撑Orderer cluster的关键,下次单独拿来讲吧。

    etcdraft的部分差不多就是这样了,当然了,有很多细节没有涉及,比如config的部分。

    相关文章

      网友评论

          本文标题:Hyperledger-Fabric源码分析(orderer-c

          本文链接:https://www.haomeiwen.com/subject/wmcmpqtx.html