在一个Fabric网络中,根据配置会生成一个创世块,Orderer启动时需要指定这个创世块,Orderer启动后,会处理client发出的请求,主要包括两种类型的服务。
1. 启动
Orderer启动时,需要编写一个配置文件,可以从$GOPATH/src/github.com/hyperledger/fabric/sampleconfigorderer.yaml
找到模板,在这个配置文件中,主要需要配置
General:
LedgerType: file #记账类型
ListenAddress: #服务监听地址
ListenPort: #服务监听地址
TLS: # TLS通信需要的相关证书和CA根证书,CA根证书用来校验client的证书
Keepalive: # 连接配置
GenesisProfile: 创世块的Profile
GenesisFile: 创世快文件
LocalMSPID: Orderer的MSP
BCCSP: # 配置加密服务提供者
FileLedger: # 文件账本相关配置
RAMLedger: # 内存账本的配置,选择内存记账才有效
Kafka: # Kafka配置,选择kafka排序后才有效
配置好orderer.yaml后,指定其所在的路径,就可以启动orderer服务了
orderer start
2. 程序实现
orderer的实现代码在orderer/main.go
中,主要流程是:
- 根据配置初始化日志的LogFormat和LogLevel。
- 根据LocalMSPDir,BCCSP和LocalMSPID读取,初始化Orderer节点的MSP服务,之后创建一个用于签名的对象signer
- 获取配置的安全和连接相关参数,保存在
ServerConfig
中,使用这些参数启动grpc服务。 - 根据配置的账本类型LedgerType,创建对应类型的LedgerFactory,然后初始化一个启动Channel。以file类型为例,如果启动时,orderer的账本目录没有账本,那么需要进行初始化,从GenesisFile中读取我们之前生产的创世块,获取创世块的ChannelId,做为区块链和账本的ID,将创世块添加到账本中;生成创世块时默认的ChannelId是
testchainid
。 - 根据配置的共识类型,初始化共识对象
consensus.Consenter
,将账本和共识对象,签名对象保存在Registrar
中,Registrar的目的是为了支持多Channel,保存了每个Channel需要的资源。 - 向grpcServer注册服务,之后启动服务器。注册的服务包括两个:Boardcast和Deliver
2.1 Consenter
consensus.Consenter
提供共识机制组件,目前有solo和kafka两种实现,其接口定义如下:
type Consenter interface {
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}
HandleChain方法会创建并返回一个Chain
对象,Chain中包含了给定的资源集合ConsenterSupport
,ConsenterSupport对象提供了Consenter达成共识所需的资源。
对于solo
机制来说,定义在orderer/consensus/solo/consensus.go
中,kafa
的共识机制定义在orderer/consensus/kafka/consenter.go
中。
Chain
Chain定义了向其他组建提供服务的接口,下面的这些接口中,可以准备共识机制所需资源后启动服务,处理不同类型的消息,返回错误等
type Chain interface {
// Orderer处理普通消息和配置类型的消息,调用这个方法交给共识机制处理
Order(env *cb.Envelope, configSeq uint64) error
//Orderer配置更新消息
Configure(config *cb.Envelope, configSeq uint64) error
// 等待共识机制准备就绪,返回后才可以向其提交消息
WaitReady() error
// 返回一个chan,当错误发生时会写入这个chan
Errored() <-chan struct{}
// 分配共识机制需要的资源,启动共识机制
Start()
// 释放共识机制的资源
Halt()
}
ConsenterSupport
ConsenterSupport内部是共识机制的主要逻辑
type ConsenterSupport interface {
crypto.LocalSigner
msgprocessor.Processor
BlockCutter() blockcutter.Receiver
SharedConfig() channelconfig.Orderer
CreateNextBlock(messages []*cb.Envelope) *cb.Block
WriteBlock(block *cb.Block, encodedMetadataValue []byte)
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)
Sequence() uint64
ChainID() string
Height() uint64
}
其中的Processor用来对Orderer收到的消息进行分类和分别处理
type Processor interface {
ClassifyMsg(chdr *cb.ChannelHeader) Classification
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
ProcessConfigMsg(env *cb.Envelope) (*cb.Envelope, uint64, error)
}
BlockCutter
Orderer会不断的接受消息并处理,处理之后保存在BlockCutter中,如果超过了指定的BatchSize,会使用Cut方法来返回一个批次的消息,并启动一个新批次饭.具体实现在orderer/common/blockcutter/blockcutter.go
中.
type Receiver interface {
Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool)
Cut() []*cb.Envelope
}
在Ordered方法中,收到消息后先判断大小,超过了推荐的最大字节数,后会使用cut方法获取一个批次,将当前消息加入并返回二维数组。如果超过了BatchSize也需要cut,之后将消息加入pendingBatch中;Cut方法就是取正在处理的消息数组pendingBatch返回。
2.2 SOLO
对于solo
来说,获得的chain包含三个成员,传入的ConsenterSupport,用于处理传递message类型的消息,message封装了消息的seq,和Orderer收到的配置类型的消息和非配置类型消息。
solo使用的ConsenterSupport在orderer/common/multichannel/chainsupport.go
中
type consenter struct{}
type chain struct {
support consensus.ConsenterSupport
sendChan chan *message
exitChan chan struct{}
}
type message struct {
configSeq uint64
normalMsg *cb.Envelope
configMsg *cb.Envelope
}
在solo
的chain实现中,我们可以看到start时,使用go启动了一个goroutine,内部不断的从sendChan中获取消息;如果是普通消息的configSeq小于当前的seq,使用support的ProcessNormalMsg处理;之后,将其放入BlockCutter中,如果返回了一个Batch,就使用CreateNextBlock将这个Batch中的消息加入Block中。
如果是配置类消息,不会放入BlockCutter中,而是使用Cut将等待打包的消息放入一个Block中,然后将配置消息单独打包为一个Block。timer的目的是如果超过了指定的BatchTimeout,主动使用Cut获取消息打包成一个Block。
solo
的Order和Configure方法会向sendChan发送消息,由主循环去处理。
2.3 Processor
ConsenterSupport实现了msgprocessor.Processor
的接口,其用于对消息分类,处理不同类型的消息
type Processor interface {
ClassifyMsg(chdr *cb.ChannelHeader) Classification
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
ProcessConfigUpdateMsg(env *cb.Envelope
ProcessConfigMsg(env *cb.Envelope) (*cb.Envelope, uint64, error)
}
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))
type StandardChannel struct {
support StandardChannelSupport
filters *RuleSet
}
创建ConsenterSupport后,也创建了Processor,使用的是msgprocessor.NewSystemChannel()
,SystemChannel内部包含一个StandardChannel,而且内部有包含StandardChannelSupport和RuleSet,RuleSet是一些了规则的过滤器,创建是包含如下四个过滤器
EmptyRejectRule // Payload非空判断
NewExpirationRejectRule(filterSupport), // 签名实体是否过期
NewSizeFilter(ordererConfig), // 大小过滤
NewSigFilter(policies.ChannelWriters, filterSupport), // /Channel/Writers策略过滤
- ClassifyMsg 根据ChannelHeader中的Type字段,对消息进行分类
- ProcessNormalMsg 使用规则的过滤器过滤消息
- ProcessConfigUpdateMsg 使用规则的过滤器过滤消息,之后调用ChannelSupport的ProposeConfigUpdate处理返回一个config,签名后组装为config再使用规则过滤。
**ProcessConfigMsg 将消息封装为ConfigUpdate处理
在创建创世块时,文件内容是一个包装了Config的Block,在创建Channel文件时,内部是一个封装为Envelope的ConfigUpdate,处理这两种消息外,Orderer还会对交易信息进行处理。
2.4 Registrar
Registrar内部包括ChainSupport,ledgerFactory和consenters
2.5 Orderer的服务
Orderer使用grpc提供了两类服务:Broadcast和Deliver,这两个服务都是双向流式的服务。Broadcast用来处理peer发来的消息;Deliver用来处理SeekInfo类型的消息,将指定的Block发送给Client。实现类是orderer/common/server/server.go
中的server
,
service AtomicBroadcast {
rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
}
type server struct {
bh broadcast.Handler
dh deliver.Handler
debug *localconfig.Debug
*multichannel.Registrar
}
server中有两个变量,bh和dh分别用来负责具体的处理
BroadcastHandler
首先判断是否为ConfigUpdate类型的消息,然后调用WaitReady(共识机制决定)阻塞等待处理消息。
如果不是,调用ProcessNormalMsg使用规则过滤,然后使用ChainSupport的Order处理,solo
的ChainSupport的Order方法我们在上面描述过程,会发送到chain的sendChan中,交给chain处理。
如果是ConfigUpdate,会使用ProcessConfigUpdateMsg处理为一个Config类型,调用Configure交给chain处理。
DeliverHandler
Deliver收到的消息是SeekInfo类型的数据,SeekInfo指定了请求的Block区间和获取不到时候的行为
type SeekInfo struct {
Start *SeekPosition
Stop *SeekPosition
Behavior SeekInfo_SeekBehavior
}
因此,主要功能就是想client返回Block
网友评论