美文网首页超级账本HyperLeder
Hyperledger-Fabric源码分析(orderer-c

Hyperledger-Fabric源码分析(orderer-c

作者: 小蜗牛爬楼梯 | 来源:发表于2020-04-07 15:07 被阅读0次

    Fabric的kafka交互用到了sarama包,有兴趣可以自行去研究下,这里就不展开了,还是focus在业务逻辑的部分。
    强烈建议在看这个之前或之后去看下这篇文章。
    A Kafka-based Ordering Service for Fabric

    启动

    func (chain *chainImpl) Start() {
        go startThread(chain)
    }
    
    func startThread(chain *chainImpl) {
    
        err = setupTopicForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.consenter.topicDetail(), chain.channel)
        
        // Set up the producer
        chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
        
        // Have the producer post the CONNECT message
        sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); 
    
        // Set up the parent consumer
        chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
    
        // Set up the channel consumer
        chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
    
        chain.replicaIDs, err = getHealthyClusterReplicaInfo(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.channel)
    
        chain.doneProcessingMessagesToBlocks = make(chan struct{})
    
        chain.errorChan = make(chan struct{}) // Deliver requests will also go through
        close(chain.startChan)                // Broadcast requests will now go through
    
        chain.processMessagesToBlocks() // Keep up to date with the channel
    }
    

    首先在Fabric的世界里,channel是连接彼此的第一要素,将Topic当作channel的延申再合适不过,orderer将block发到topic上,订阅该topic的消费者会写入本地orderer账本。setupTopicForChannel里面有个细节,每个Topic都只有一个分区,这是为了保证消息严格有序。这很重要。
    根据配置创建消息生产者,消费者
    给消息生产者发送KafkaMessage_Connect消息,后面会讲到。
    close(chain.startChan)用来提醒,chain启动已经完毕。
    接下来processMessagesToBlocks将监听并处理收到的消息

    Normal消息处理

    func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
    ...
    case in, ok := <-chain.channelConsumer.Messages():
    if !ok {
    logger.Criticalf("[channel: %s] Kafka consumer closed.", chain.ChainID())
    return counts, nil
    }

            ...
            switch msg.Type.(type) {
            case *ab.KafkaMessage_Connect:
                _ = chain.processConnect(chain.ChainID())
                counts[indexProcessConnectPass]++
            case *ab.KafkaMessage_TimeToCut:
                if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
                    logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
                    logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
                    counts[indexProcessTimeToCutError]++
                    return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
                }
                counts[indexProcessTimeToCutPass]++
            case *ab.KafkaMessage_Regular:
                if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
                    logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.ChainID(), err)
                    counts[indexProcessRegularError]++
                } else {
                    counts[indexProcessRegularPass]++
                }
            }
    ...
    

    }

    这里针对三种消息类型分别做了处理,可以看到KafkaMessage_Regular类型会转发到chain.processRegular。

    这里稍微提下KafkaMessage_Connect的处理,里面什么都没有做,还记得启动的时候创建完producer后,马上接着发了一个Connect消息出去么?这一步的意义在于,一是,topic已经创建成功。二是,消息能成功发送。三是,消息能成功消费。

    接下来正式开始处理Normal消息

    case ab.KafkaMessageRegular_NORMAL:
       if regularMessage.OriginalOffset != 0 {
          // But we've reprocessed it already
          if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
             logger.Debugf(
                "[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed(%d), message has been consumed already, discard",
                chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
             return nil
          }
    
       }
    
       // The config sequence has advanced
       if regularMessage.ConfigSeq < seq {
          logger.Debugf("[channel: %s] Config sequence has advanced since this normal message got validated, re-validating", chain.ChainID())
          configSeq, err := chain.ProcessNormalMsg(env)
          if err != nil {
             return fmt.Errorf("discarding bad normal message because = %s", err)
          }
    
          if err := chain.order(env, configSeq, receivedOffset); err != nil {
             return fmt.Errorf("error re-submitting normal message because = %s", err)
          }
    
          return nil
       }
    
       offset := regularMessage.OriginalOffset
       if offset == 0 {
          offset = chain.lastOriginalOffsetProcessed
       }
    
       // During consensus-type migration, drop normal messages that managed to sneak in past Order, possibly from other orderers
       if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
          return nil
       }
       commitNormalMsg(env, offset)
    

    1.因为kafka单分区保证有序的特征,所以chain能维持一个最新消息offset,用来判断新到的消息是不是已经处理过了。这里有个细节,之前看的时候忽略了,以为只是个容错处理,想不到大有文章。regularMessage.OriginalOffset != 0,查了下每个消息的OriginalOffset初始都为0,什么情况会变?不为0意味着这个消息被人消费过但是因为种种原因没有处理,被丢到kafka,重新reorder过。
    2.regularMessage.ConfigSeq < seq是不是很眼熟,这里solo讲过了。不同在于,如果发现从发送到接受处理的过程中发现config有变化,如果能re-validate,说明消息是没有问题的,只不过是大环境变了,试着re-order看看。
    3.最后开始真正的commitNormalMsg

    commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
       batches, pending := chain.BlockCutter().Ordered(message)
       logger.Debugf("[channel: %s] Ordering results: items in batch = %d, pending = %v", chain.ChainID(), len(batches), pending)
    
       switch {
       case chain.timer != nil && !pending:
          chain.timer = nil
       case chain.timer == nil && pending:
          chain.timer = time.After(chain.SharedConfig().BatchTimeout())
          logger.Debugf("[channel: %s] Just began %s batch timer", chain.ChainID(), chain.SharedConfig().BatchTimeout().String())
       default:
       }
    
       if len(batches) == 0 {
          // If no block is cut, we update the `lastOriginalOffsetProcessed`, start the timer if necessary and return
          chain.lastOriginalOffsetProcessed = newOffset
          return
       }
    
       offset := receivedOffset
       if pending || len(batches) == 2 {
          offset--
       } else {
          chain.lastOriginalOffsetProcessed = newOffset
       }
    
       // Commit the first block
       block := chain.CreateNextBlock(batches[0])
       metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
          LastOffsetPersisted:         offset,
          LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
          LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
       })
       chain.WriteBlock(block, metadata)
       chain.lastCutBlockNumber++
       logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", chain.ChainID(), chain.lastCutBlockNumber, offset)
    
       // Commit the second block if exists
       if len(batches) == 2 {
          chain.lastOriginalOffsetProcessed = newOffset
          offset++
    
          block := chain.CreateNextBlock(batches[1])
          metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
             LastOffsetPersisted:         offset,
             LastOriginalOffsetProcessed: newOffset,
             LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
          })
          chain.WriteBlock(block, metadata)
          chain.lastCutBlockNumber++
          logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", chain.ChainID(), chain.lastCutBlockNumber, offset)
       }
    }
    

    1.这里前半部分跟solo是一样了,无非就是看准时机进行blockcut。这里要注意的是,cut出来的结果只有一种情况会返回2个,也就是单个消息size太大,导致单独成包的情况。其他情况都是一个。

    2.pending || len(batches) == 2这个判断很关键,如果新消息进来,这个条件成立,说明什么?说明这个新消息要不这次根本就没条件去commit,要不cut出来两个block,自己归在第二个里面,所以这里会首先offset--,等之后真正提交自己的时候再++回来。

    3.否则条件不成立就说明这次新来的消息时机简直完美,不光触发了Cut,自己还能赶上这波commit

    4.这里几个offset有必要体会下,理解对了,实现就没问题。

    • receivedOffset : consumer的消费进度
    • newOffset : 新进消息的offset,两种情况,brandnew和reorder的。
    • LastOffsetPersisted : 已经commit到账本的最后消息的offset
    • LastOriginalOffsetProcessed : 已经接受到的最新消息的offset

    接下来很熟悉了,将生成的block都写入到orderer本地账本中。

    至此是不是觉得少了点什么,看过solo篇的就知道,少了超时处理。回到这个最开始,差点看漏了,这里正好在生成一个超时的timer,如果有pending消息等待出包的话。

    超时处理

    case <-chain.timer:
       if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil {
          ...
       }
    }
    

    可以看到这里是通过发送KafkaMessage_TimeToCut来实现的,想想为什么不直接本地做超时处理就好了?这里需要注意的是orderer是集群的情况,怎样满足成员间保持一致,首要条件是只能有一个地方写入,其他成员进行同步,如果都自己做自己的,那共识也就无从谈起。

    case *ab.KafkaMessage_TimeToCut:
       if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
          return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
       }
    
    func (chain *chainImpl) processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, receivedOffset int64) error {
       ttcNumber := ttcMessage.GetBlockNumber()
       if ttcNumber == chain.lastCutBlockNumber+1 {
          chain.timer = nil
          logger.Debugf("[channel: %s] Nil'd the timer", chain.ChainID())
          batch := chain.BlockCutter().Cut()
          if len(batch) == 0 {
             return fmt.Errorf("got right time-to-cut message (for block %d),"+
                " no pending requests though; this might indicate a bug", chain.lastCutBlockNumber+1)
          }
          block := chain.CreateNextBlock(batch)
          metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
             LastOffsetPersisted:         receivedOffset,
             LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
          })
          chain.WriteBlock(block, metadata)
          chain.lastCutBlockNumber++
          return nil
       } else if ttcNumber > chain.lastCutBlockNumber+1 {
          return fmt.Errorf("got larger time-to-cut message (%d) than allowed/expected (%d)"+
             " - this might indicate a bug", ttcNumber, chain.lastCutBlockNumber+1)
       }
       return nil
    }
    

    跟前面也没有什么区别,没什么好讲的。

    有个场景可以想象一下,orderer集群成员基本上都是同时收到的消息进行处理的,大家的状态理想状态都是一致的,如果同时触发timeout,然后又同时对外发布timeout消息,那是不是说都各自去创建block去提交,那不是乱套了?

    这种情况仔细想想没有可能性,前面同时触发timeout是可能的,因为大家状态都一样,要超时肯定都超时,关键是同时发布timeout消息的部分,这里kafka不是吃素的,它保证了这些消息是有序的,这也间接保证只有一个orderer能commit本地成功,之后消息执行会不满足条件(ttcNumber == chain.lastCutBlockNumber+1 )。

    相关文章

      网友评论

        本文标题:Hyperledger-Fabric源码分析(orderer-c

        本文链接:https://www.haomeiwen.com/subject/xucnphtx.html