美文网首页Hyperledger Fabric源码分析
Fabric源码分析-Orderer启动

Fabric源码分析-Orderer启动

作者: 史圣杰 | 来源:发表于2018-12-26 16:48 被阅读0次

    在一个Fabric网络中,根据配置会生成一个创世块,Orderer启动时需要指定这个创世块,Orderer启动后,会处理client发出的请求,主要包括两种类型的服务。

    1. 启动

    Orderer启动时,需要编写一个配置文件,可以从$GOPATH/src/github.com/hyperledger/fabric/sampleconfigorderer.yaml找到模板,在这个配置文件中,主要需要配置

    General:
      LedgerType: file   #记账类型
      ListenAddress:     #服务监听地址
      ListenPort:     #服务监听地址
      TLS:   # TLS通信需要的相关证书和CA根证书,CA根证书用来校验client的证书
      Keepalive:  # 连接配置
      GenesisProfile:  创世块的Profile
      GenesisFile: 创世快文件
      LocalMSPID: Orderer的MSP
      BCCSP: # 配置加密服务提供者
    FileLedger:  # 文件账本相关配置
    RAMLedger: # 内存账本的配置,选择内存记账才有效
    Kafka: # Kafka配置,选择kafka排序后才有效
    

    配置好orderer.yaml后,指定其所在的路径,就可以启动orderer服务了

    orderer start
    

    2. 程序实现

    orderer的实现代码在orderer/main.go中,主要流程是:

    1. 根据配置初始化日志的LogFormat和LogLevel。
    2. 根据LocalMSPDir,BCCSP和LocalMSPID读取,初始化Orderer节点的MSP服务,之后创建一个用于签名的对象signer
    3. 获取配置的安全和连接相关参数,保存在ServerConfig中,使用这些参数启动grpc服务。
    4. 根据配置的账本类型LedgerType,创建对应类型的LedgerFactory,然后初始化一个启动Channel。以file类型为例,如果启动时,orderer的账本目录没有账本,那么需要进行初始化,从GenesisFile中读取我们之前生产的创世块,获取创世块的ChannelId,做为区块链和账本的ID,将创世块添加到账本中;生成创世块时默认的ChannelId是testchainid
    5. 根据配置的共识类型,初始化共识对象consensus.Consenter,将账本和共识对象,签名对象保存在Registrar中,Registrar的目的是为了支持多Channel,保存了每个Channel需要的资源。
    6. 向grpcServer注册服务,之后启动服务器。注册的服务包括两个:Boardcast和Deliver

    2.1 Consenter

    consensus.Consenter提供共识机制组件,目前有solo和kafka两种实现,其接口定义如下:

    type Consenter interface {
        HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
    }
    

    HandleChain方法会创建并返回一个Chain对象,Chain中包含了给定的资源集合ConsenterSupport,ConsenterSupport对象提供了Consenter达成共识所需的资源。

    对于solo机制来说,定义在orderer/consensus/solo/consensus.go中,kafa的共识机制定义在orderer/consensus/kafka/consenter.go中。

    Chain
    Chain定义了向其他组建提供服务的接口,下面的这些接口中,可以准备共识机制所需资源后启动服务,处理不同类型的消息,返回错误等

    type Chain interface {
        // Orderer处理普通消息和配置类型的消息,调用这个方法交给共识机制处理
        Order(env *cb.Envelope, configSeq uint64) error 
        //Orderer配置更新消息
        Configure(config *cb.Envelope, configSeq uint64) error  
         // 等待共识机制准备就绪,返回后才可以向其提交消息
        WaitReady() error
        // 返回一个chan,当错误发生时会写入这个chan 
        Errored() <-chan struct{}   
        // 分配共识机制需要的资源,启动共识机制
        Start() 
        // 释放共识机制的资源
        Halt()
    }
    

    ConsenterSupport
    ConsenterSupport内部是共识机制的主要逻辑

    type ConsenterSupport interface {
        crypto.LocalSigner
        msgprocessor.Processor  
        BlockCutter() blockcutter.Receiver  
        SharedConfig() channelconfig.Orderer    
        CreateNextBlock(messages []*cb.Envelope) *cb.Block  
        WriteBlock(block *cb.Block, encodedMetadataValue []byte)    
        WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)  
        Sequence() uint64   
        ChainID() string    
        Height() uint64
    }
    

    其中的Processor用来对Orderer收到的消息进行分类和分别处理

    type Processor interface {
        ClassifyMsg(chdr *cb.ChannelHeader) Classification
        ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
        ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
        ProcessConfigMsg(env *cb.Envelope) (*cb.Envelope, uint64, error)
    }
    

    BlockCutter
    Orderer会不断的接受消息并处理,处理之后保存在BlockCutter中,如果超过了指定的BatchSize,会使用Cut方法来返回一个批次的消息,并启动一个新批次饭.具体实现在orderer/common/blockcutter/blockcutter.go中.

    type Receiver interface {
        Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool)
        Cut() []*cb.Envelope
    }
    

    在Ordered方法中,收到消息后先判断大小,超过了推荐的最大字节数,后会使用cut方法获取一个批次,将当前消息加入并返回二维数组。如果超过了BatchSize也需要cut,之后将消息加入pendingBatch中;Cut方法就是取正在处理的消息数组pendingBatch返回。

    2.2 SOLO

    对于solo来说,获得的chain包含三个成员,传入的ConsenterSupport,用于处理传递message类型的消息,message封装了消息的seq,和Orderer收到的配置类型的消息和非配置类型消息。
    solo使用的ConsenterSupport在orderer/common/multichannel/chainsupport.go

    type consenter struct{}
    type chain struct {
        support  consensus.ConsenterSupport
        sendChan chan *message
        exitChan chan struct{}
    }
    type message struct {
        configSeq uint64
        normalMsg *cb.Envelope
        configMsg *cb.Envelope
    }
    

    solo的chain实现中,我们可以看到start时,使用go启动了一个goroutine,内部不断的从sendChan中获取消息;如果是普通消息的configSeq小于当前的seq,使用support的ProcessNormalMsg处理;之后,将其放入BlockCutter中,如果返回了一个Batch,就使用CreateNextBlock将这个Batch中的消息加入Block中。
    如果是配置类消息,不会放入BlockCutter中,而是使用Cut将等待打包的消息放入一个Block中,然后将配置消息单独打包为一个Block。timer的目的是如果超过了指定的BatchTimeout,主动使用Cut获取消息打包成一个Block。

    solo的Order和Configure方法会向sendChan发送消息,由主循环去处理。

    2.3 Processor

    ConsenterSupport实现了msgprocessor.Processor的接口,其用于对消息分类,处理不同类型的消息

    type Processor interface {
        ClassifyMsg(chdr *cb.ChannelHeader) Classification
        ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
        ProcessConfigUpdateMsg(env *cb.Envelope
        ProcessConfigMsg(env *cb.Envelope) (*cb.Envelope, uint64, error)
    }
    cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))
    
    type StandardChannel struct {
        support StandardChannelSupport
        filters *RuleSet
    }
    

    创建ConsenterSupport后,也创建了Processor,使用的是msgprocessor.NewSystemChannel(),SystemChannel内部包含一个StandardChannel,而且内部有包含StandardChannelSupport和RuleSet,RuleSet是一些了规则的过滤器,创建是包含如下四个过滤器

    EmptyRejectRule   // Payload非空判断
    NewExpirationRejectRule(filterSupport),    // 签名实体是否过期
    NewSizeFilter(ordererConfig),  // 大小过滤
    NewSigFilter(policies.ChannelWriters, filterSupport), // /Channel/Writers策略过滤
    
    • ClassifyMsg 根据ChannelHeader中的Type字段,对消息进行分类
    • ProcessNormalMsg 使用规则的过滤器过滤消息
    • ProcessConfigUpdateMsg 使用规则的过滤器过滤消息,之后调用ChannelSupport的ProposeConfigUpdate处理返回一个config,签名后组装为config再使用规则过滤。
      **ProcessConfigMsg 将消息封装为ConfigUpdate处理

    在创建创世块时,文件内容是一个包装了Config的Block,在创建Channel文件时,内部是一个封装为Envelope的ConfigUpdate,处理这两种消息外,Orderer还会对交易信息进行处理。

    2.4 Registrar

    Registrar内部包括ChainSupport,ledgerFactory和consenters

    2.5 Orderer的服务

    Orderer使用grpc提供了两类服务:Broadcast和Deliver,这两个服务都是双向流式的服务。Broadcast用来处理peer发来的消息;Deliver用来处理SeekInfo类型的消息,将指定的Block发送给Client。实现类是orderer/common/server/server.go中的server

    service AtomicBroadcast {
        rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
        rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
    }
    type server struct {
        bh    broadcast.Handler
        dh    deliver.Handler
        debug *localconfig.Debug
        *multichannel.Registrar
    }
    

    server中有两个变量,bh和dh分别用来负责具体的处理

    BroadcastHandler

    首先判断是否为ConfigUpdate类型的消息,然后调用WaitReady(共识机制决定)阻塞等待处理消息。
    如果不是,调用ProcessNormalMsg使用规则过滤,然后使用ChainSupport的Order处理,solo的ChainSupport的Order方法我们在上面描述过程,会发送到chain的sendChan中,交给chain处理。
    如果是ConfigUpdate,会使用ProcessConfigUpdateMsg处理为一个Config类型,调用Configure交给chain处理。

    DeliverHandler

    Deliver收到的消息是SeekInfo类型的数据,SeekInfo指定了请求的Block区间和获取不到时候的行为

    type SeekInfo struct {
        Start    *SeekPosition         
        Stop     *SeekPosition         
        Behavior SeekInfo_SeekBehavior 
    }
    

    因此,主要功能就是想client返回Block

    相关文章

      网友评论

        本文标题:Fabric源码分析-Orderer启动

        本文链接:https://www.haomeiwen.com/subject/mnpflqtx.html