美文网首页
Hyperledger-Fabric源码分析(Gossip-An

Hyperledger-Fabric源码分析(Gossip-An

作者: Pillar_Zhong | 来源:发表于2019-04-01 16:21 被阅读0次

    Anti-Entropy

    首先我们先看一个概念,Anti-Entropy。

    Gossip算法又被称为反熵(Anti-Entropy),熵是物理学上的一个概念,代表杂乱无章,而反熵就是在杂乱无章中寻求一致,这充分说明了 Gossip的特点:在一个有界网络中,每个节点都随机地与其他节点通信,经过一番杂乱无章的通信,最终所有节点的状态都会达成一致。每个节点可能知道所有其他节点,也可能仅知道几个邻居节点,只要这些节可以通过网络连通,最终他们的状态都是一致的,当然这也是疫情传播的特点。

    初看感觉不知所云,废话不多说,我们下面深入看看fabric的Anti-Entropy是怎么实现的。

    GossipStateProvider

    在开始之前,我们要知道反熵是在同步什么?当然是block。而GossipStateProvider就是为了这个目的而生,他随着peer启动而启动。启动之后会默默的为达成最终一致而努力。

    func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider {
    
      ...
    
       // Listen for incoming communication
       go s.listen()
       // Deliver in order messages into the incoming channel
       go s.deliverPayloads()
       // Execute anti entropy to fill missing gaps
       go s.antiEntropy()
       // Taking care of state request messages
       go s.processStateRequests()
    
       return s
    }
    

    在启动的过程他主要做几件事,下面我们深入分析下

    • listen
      • 这里值得注意的是state里面的payloads,不管是从orderer收到的block还是从peer同步来的block,都会第一时间push到payloads里面,而deliverPayloads会伺机将block保存到本地账本。
    • deliverPayloads
    • antiEntropy
    • processStateRequests

    listen

    func (s *GossipStateProviderImpl) listen() {
       defer s.done.Done()
    
       for {
          select {
          case msg := <-s.gossipChan:
             logger.Debug("Received new message via gossip channel")
             go s.queueNewMessage(msg)
          case msg := <-s.commChan:
             logger.Debug("Dispatching a message", msg)
             go s.dispatch(msg)
          case <-s.stopCh:
             s.stopCh <- struct{}{}
             logger.Debug("Stop listening for new messages")
             return
          }
       }
    }
    
    • gossipChan就是用来接收orderer发来的block消息
    • 而这里主要是commChan,来接收peer发来的GossipMessage_StateRequest或GossipMessage_StateResponse

    antiEntropy

    func (s *GossipStateProviderImpl) antiEntropy() {
       defer s.done.Done()
       defer logger.Debug("State Provider stopped, stopping anti entropy procedure.")
    
       for {
          select {
          case <-s.stopCh:
             s.stopCh <- struct{}{}
             return
          case <-time.After(defAntiEntropyInterval):
             ourHeight, err := s.ledger.LedgerHeight()
             if err != nil {
                // Unable to read from ledger continue to the next round
                logger.Errorf("Cannot obtain ledger height, due to %+v", errors.WithStack(err))
                continue
             }
             if ourHeight == 0 {
                logger.Error("Ledger reported block height of 0 but this should be impossible")
                continue
             }
             maxHeight := s.maxAvailableLedgerHeight()
             if ourHeight >= maxHeight {
                continue
             }
    
             s.requestBlocksInRange(uint64(ourHeight), uint64(maxHeight)-1)
          }
       }
    }
    

    这里便正式开始处理了。

    • 每隔一段时间去获取ledger的高度,这里的ledger当然是本地。
    • 接着会遍历本地的成员列表中ledger高度
    • 接下来,就是根据本地缺失的部分去拉取数据了。

    maxAvailableLedgerHeight

    func (s *GossipStateProviderImpl) maxAvailableLedgerHeight() uint64 {
       max := uint64(0)
       for _, p := range s.mediator.PeersOfChannel(common2.ChainID(s.chainID)) {
          if p.Properties == nil {
             logger.Debug("Peer", p.PreferredEndpoint(), "doesn't have properties, skipping it")
             continue
          }
          peerHeight := p.Properties.LedgerHeight
          if max < peerHeight {
             max = peerHeight
          }
       }
       return max
    }
    
    • 这里就是遍历的现场啦,拿到最大的高度值
    • 还记得之前的GossipMessage_StateInfo篇么?那里的Properties的LedgerHeight在这里用上了。不管是主动拉取还是被动获得,总之,本地保存了其他成员的账本状态。

    requestBlocksInRange

    func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) {
       atomic.StoreInt32(&s.stateTransferActive, 1)
       defer atomic.StoreInt32(&s.stateTransferActive, 0)
    
       for prev := start; prev <= end; {
          next := min(end, prev+defAntiEntropyBatchSize)
    
          gossipMsg := s.stateRequestMessage(prev, next)
    
          responseReceived := false
          tryCounts := 0
    
          for !responseReceived {
             if tryCounts > defAntiEntropyMaxRetries {
                logger.Warningf("Wasn't  able to get blocks in range [%d...%d), after %d retries",
                   prev, next, tryCounts)
                return
             }
             // Select peers to ask for blocks
             peer, err := s.selectPeerToRequestFrom(next)
             if err != nil {
                logger.Warningf("Cannot send state request for blocks in range [%d...%d), due to %+v",
                   prev, next, errors.WithStack(err))
                return
             }
    
             logger.Debugf("State transfer, with peer %s, requesting blocks in range [%d...%d), "+
                "for chainID %s", peer.Endpoint, prev, next, s.chainID)
    
             s.mediator.Send(gossipMsg, peer)
             tryCounts++
    
             // Wait until timeout or response arrival
             select {
             case msg := <-s.stateResponseCh:
                if msg.GetGossipMessage().Nonce != gossipMsg.Nonce {
                   continue
                }
                // Got corresponding response for state request, can continue
                index, err := s.handleStateResponse(msg)
                if err != nil {
                   logger.Warningf("Wasn't able to process state response for "+
                      "blocks [%d...%d], due to %+v", prev, next, errors.WithStack(err))
                   continue
                }
                prev = index + 1
                responseReceived = true
             case <-time.After(defAntiEntropyStateResponseTimeout):
             case <-s.stopCh:
                s.stopCh <- struct{}{}
                return
             }
          }
       }
    }
    
    • 当然了,不是一次性全部去拉取,也不是一个一个去拉取,做事还是要讲究个方式方法,得有计划分批请求。
    • 创建GossipMessage_StateRequest消息,也没什么,主要是start和end,表示我要拉取这个range的数据。在channel成员中随机选取一个height能到达next的peer节点进行发送,并等待GossipMessage_StateResponse
    • 前面的listen会将收到的response转发到这里
    • 如果Nonce不一致,说明不是前面请求所期待的response,直接跳过
    • 接着就是handleStateResponse

    handleStateResponse

    func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage) (uint64, error) {
       max := uint64(0)
       // Send signal that response for given nonce has been received
       response := msg.GetGossipMessage().GetStateResponse()
       // Extract payloads, verify and push into buffer
       if len(response.GetPayloads()) == 0 {
          return uint64(0), errors.New("Received state transfer response without payload")
       }
       for _, payload := range response.GetPayloads() {
          logger.Debugf("Received payload with sequence number %d.", payload.SeqNum)
          if err := s.mediator.VerifyBlock(common2.ChainID(s.chainID), payload.SeqNum, payload.Data); err != nil {
             err = errors.WithStack(err)
             logger.Warningf("Error verifying block with sequence number %d, due to %+v", payload.SeqNum, err)
             return uint64(0), err
          }
          if max < payload.SeqNum {
             max = payload.SeqNum
          }
    
          err := s.addPayload(payload, blocking)
          if err != nil {
             logger.Warningf("Block [%d] received from block transfer wasn't added to payload buffer: %v", payload.SeqNum, err)
          }
       }
       return max, nil
    }
    
    • 这里主要是拿到StateResponse,push到stateprovider的payloads里面,而payloads后面会有针对性的处理

    deliverPayloads

    func (s *GossipStateProviderImpl) deliverPayloads() {
       defer s.done.Done()
    
       for {
          select {
          // Wait for notification that next seq has arrived
          case <-s.payloads.Ready():
             logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
             // Collect all subsequent payloads
             for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
                rawBlock := &common.Block{}
                if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
                   logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
                   continue
                }
                if rawBlock.Data == nil || rawBlock.Header == nil {
                   logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
                      payload.SeqNum, rawBlock.Header, rawBlock.Data)
                   continue
                }
                logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))
    
                // Read all private data into slice
                var p util.PvtDataCollections
                if payload.PrivateData != nil {
                   err := p.Unmarshal(payload.PrivateData)
                   if err != nil {
                      logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
                      continue
                   }
                }
                if err := s.commitBlock(rawBlock, p); err != nil {
                   if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
                      logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
                      return
                   }
                   logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
                }
             }
          case <-s.stopCh:
             s.stopCh <- struct{}{}
             logger.Debug("State provider has been stopped, finishing to push new blocks.")
             return
          }
       }
    }
    
    • 前面收到peer发来的statereponse后,push到payloads里面,这里的Ready就会激活。
    • 将payload理解为block,那么这里就是遍历payloads里面累计的block,然后commit到本地ledger。
    • 当然,最后更新ledger的高度。不要忘了,这个变化会最终同步给其他成员,以stateinfo消息的方式。

    processStateRequests

    func (s *GossipStateProviderImpl) processStateRequests() {
       defer s.done.Done()
    
       for {
          select {
          case msg := <-s.stateRequestCh:
             s.handleStateRequest(msg)
          case <-s.stopCh:
             s.stopCh <- struct{}{}
             return
          }
       }
    }
    
    • 前面的listen会将收到的request转发到这里

    handleStateRequest

    func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage) {
       if msg == nil {
          return
       }
       request := msg.GetGossipMessage().GetStateRequest()
    
       batchSize := request.EndSeqNum - request.StartSeqNum
       if batchSize > defAntiEntropyBatchSize {
          logger.Errorf("Requesting blocks batchSize size (%d) greater than configured allowed"+
             " (%d) batching for anti-entropy. Ignoring request...", batchSize, defAntiEntropyBatchSize)
          return
       }
    
       if request.StartSeqNum > request.EndSeqNum {
          logger.Errorf("Invalid sequence interval [%d...%d], ignoring request...", request.StartSeqNum, request.EndSeqNum)
          return
       }
    
       currentHeight, err := s.ledger.LedgerHeight()
       if err != nil {
          logger.Errorf("Cannot access to current ledger height, due to %+v", errors.WithStack(err))
          return
       }
       if currentHeight < request.EndSeqNum {
          logger.Warningf("Received state request to transfer blocks with sequence numbers higher  [%d...%d] "+
             "than available in ledger (%d)", request.StartSeqNum, request.StartSeqNum, currentHeight)
       }
    
       endSeqNum := min(currentHeight, request.EndSeqNum)
    
       response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)}
       for seqNum := request.StartSeqNum; seqNum <= endSeqNum; seqNum++ {
          logger.Debug("Reading block ", seqNum, " with private data from the coordinator service")
          connInfo := msg.GetConnectionInfo()
          peerAuthInfo := common.SignedData{
             Data:      connInfo.Auth.SignedData,
             Signature: connInfo.Auth.Signature,
             Identity:  connInfo.Identity,
          }
          block, pvtData, err := s.ledger.GetPvtDataAndBlockByNum(seqNum, peerAuthInfo)
    
          if err != nil {
             logger.Errorf("cannot read block number %d from ledger, because %+v, skipping...", seqNum, err)
             continue
          }
    
          if block == nil {
             logger.Errorf("Wasn't able to read block with sequence number %d from ledger, skipping....", seqNum)
             continue
          }
    
          blockBytes, err := pb.Marshal(block)
    
          if err != nil {
             logger.Errorf("Could not marshal block: %+v", errors.WithStack(err))
             continue
          }
    
          var pvtBytes [][]byte
          if pvtData != nil {
             // Marshal private data
             pvtBytes, err = pvtData.Marshal()
             if err != nil {
                logger.Errorf("Failed to marshal private rwset for block %d due to %+v", seqNum, errors.WithStack(err))
                continue
             }
          }
    
          // Appending result to the response
          response.Payloads = append(response.Payloads, &proto.Payload{
             SeqNum:      seqNum,
             Data:        blockBytes,
             PrivateData: pvtBytes,
          })
       }
       // Sending back response with missing blocks
       msg.Respond(&proto.GossipMessage{
          // Copy nonce field from the request, so it will be possible to match response
          Nonce:   msg.GetGossipMessage().Nonce,
          Tag:     proto.GossipMessage_CHAN_OR_ORG,
          Channel: []byte(s.chainID),
          Content: &proto.GossipMessage_StateResponse{StateResponse: response},
       })
    }
    
    • 基本上你也能猜得到,就是根据请求,尽可能满足对方的要求,将自己能给的block打包成GossipMessage_StateResponse发给他。

    总结

    至此,可以看到Fabric的Anti-Entropy其实很简单,就是定时去跟成员比对账本状态,达到最终一致。

    相关文章

      网友评论

          本文标题:Hyperledger-Fabric源码分析(Gossip-An

          本文链接:https://www.haomeiwen.com/subject/ptevbqtx.html