StateInfo是用来传播peer的状态信息给其他成员。
struct
type StateInfo struct {
Timestamp *PeerTime
PkiId []byte
// channel_MAC is an authentication code that proves
// that the peer that sent this message knows
// the name of the channel.
Channel_MAC []byte
Properties *Properties
}
type Properties struct {
LedgerHeight uint64
LeftChannel bool
Chaincodes []*Chaincode
}
初始化
stateInfMsg := &proto.StateInfo{
Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
PkiId: gc.pkiID,
Timestamp: &proto.PeerTime{
IncNum: gc.incTime,
SeqNum: uint64(time.Now().UnixNano()),
},
Properties: &proto.Properties{
LeftChannel: leftChannel,
LedgerHeight: ledgerHeight,
Chaincodes: chaincodes,
},
}
m := &proto.GossipMessage{
Nonce: 0,
Tag: proto.GossipMessage_CHAN_OR_ORG,
Content: &proto.GossipMessage_StateInfo{
StateInfo: stateInfMsg,
},
}
可以看到stateinfo的组成
- pkiid,你可以理解为peer的标识id,内部其实就是mspid+cert算出来的一个摘要。
- Channel_MAC,pkiid+chainid生成MAC,背后就是sha256计算hash
- 时间戳
- 属性代表着三个触发stateinfo消息的地方
- 该节点退出通道
- 有新的block写入,更新peer的账本height
- chaincode更新
- 这个看起来不太好理解。我跟进了下,chaincode部署成功会触发这里。换句话说如果cc部署成功,是通过这个消息让成员知道的。
触发点
commitblock
func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {
// Commit block with available private transactions
if err := s.ledger.StoreBlock(block, pvtData); err != nil {
logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
return err
}
// Update ledger height
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChainID(s.chainID))
logger.Debugf("[%s] Committed block [%d] with %d transaction(s)",
s.chainID, block.Header.Number, len(block.Data.Data))
return nil
}
- 前面就是提交block到账本了
- 后来开始UpdateLedgerHeight,开始处理账本新的height
UpdateLedgerHeight
func (gc *gossipChannel) UpdateLedgerHeight(height uint64) {
gc.Lock()
defer gc.Unlock()
var chaincodes []*proto.Chaincode
var leftChannel bool
if prevMsg := gc.stateInfoMsg; prevMsg != nil {
leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
}
gc.updateProperties(height, chaincodes, leftChannel)
}
- 因为只是更新height,所以其他的状态沿用之前的stateInfoMsg
- 下面开始要广播前的最后准备工作了
updateStateInfo
func (gc *gossipChannel) updateStateInfo(msg *proto.SignedGossipMessage) {
gc.stateInfoMsgStore.Add(msg)
gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
gc.stateInfoMsg = msg
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}
- stateInfoMsgStore用来保存收到的成员发来的所有的stateinfo消息,包括自己的。
- 更新自己的height
- 每给其他成员分享一次stateinfo的时候,都会自己保留一份,以备不时之需。这种情况正好用上。
- 启动shouldGossipStateInfo开关
发送点
func (gc *gossipChannel) publishStateInfo() {
if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
return
}
gc.RLock()
stateInfoMsg := gc.stateInfoMsg
gc.RUnlock()
gc.Gossip(stateInfoMsg)
if len(gc.GetMembership()) > 0 {
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0))
}
}
- 可以看到最后会在这里将消息Gossip出去,里面是用emitter模块去处理。emitter你暂时不用关心,里面根据不同的消息类型来决定点对点发送还是群发,不过在这里你只用知道发出去就好了。有时间我会专门讲这个模块。
- 那么发送是怎么触发的呢?
时机
func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
gc := &gossipChannel{
incTime: uint64(time.Now().UnixNano()),
selfOrg: org,
pkiID: pkiID,
mcs: mcs,
Adapter: adapter,
logger: util.GetLogger(util.ChannelLogger, adapter.GetConf().ID),
stopChan: make(chan struct{}, 1),
shouldGossipStateInfo: int32(0),
stateInfoPublishScheduler: time.NewTicker(adapter.GetConf().PublishStateInfoInterval),
stateInfoRequestScheduler: time.NewTicker(adapter.GetConf().RequestStateInfoInterval),
orgs: []api.OrgIdentityType{},
chainID: chainID,
}
...
// Periodically publish state info
go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
...
}
- 在初始化GossipChannel的时候,会定时来监听是否有新的stateinfo消息需要发布。
- GossipChannel看名字你也大概能猜到干嘛的,这是专门给同channel的成员进行gossip服务的。
接受点
- 中间省略了很多地方,这个消息的专题都是这种风格,尽量不要被其他的细节给干扰。总之,消息已经被peer收到,下面我们看下收到后,怎么处理。
GossipService
- GossipService是统管gossip服务的,所有的动作都由这里发起,消息处理也不例外
消息验证
func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.SignedGossipMessage) error {
verifier := func(identity []byte, signature, message []byte) error {
pkiID := g.idMapper.GetPKIidOfCert(api.PeerIdentityType(identity))
if pkiID == nil {
return errors.New("PKI-ID not found in identity mapper")
}
return g.idMapper.Verify(pkiID, signature, message)
}
identity, err := g.idMapper.Get(msg.GetStateInfo().PkiId)
if err != nil {
return errors.WithStack(err)
}
return msg.Verify(identity, verifier)
}
- 这里主要做两个事情
- 一,判断当前消息的Pkiid是否认识,这是消息接收的基础。因为Gossip有机制能同步成员列表,如果有不认识节点,那么就问题大了。
- 二,对方消息是私钥进行数字签名的,这里用本地的公钥进行签名校验。这也是安全的基础。
消息处理
func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
...
if msg.IsChannelRestricted() {
if gc := g.chanState.lookupChannelForMsg(m); gc == nil {
// If we're not in the channel, we should still forward to peers of our org
// in case it's a StateInfo message
if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() {
if g.stateInfoMsgStore.Add(msg) {
g.emitter.Add(&emittedGossipMessage{
SignedGossipMessage: msg,
filter: m.GetConnectionInfo().ID.IsNotSameFilter,
})
}
}
if !g.toDie() {
g.logger.Debug("No such channel", msg.Channel, "discarding message", msg)
}
} else {
...
gc.HandleMessage(m)
}
return
}
...
}
- 前面校验部分过了后,基本上消息没有大的问题
- 下面开始正式处理了,这里需要一些背景知识。首先一个Peer加入一个channel, 就会有一个GossipChannel相伴。所以如果查不到这个gc,那么说明Peer不在这个channel里面。
- 这里是描述了两个场景
- 首先该Peer不在这个channel里面,但属于同一个Org,也就是组织,那么处于优化的目的,可以尽快将消息扩散出去,以尽快让同channel的节点处理。再次遇到emitter,忽略它。
- 如果同属一个channel,那么开始交给所属的GossipChannel来处理。
GossipChannel
消息校验
func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
...
if m.IsStateInfoMsg() {
si := m.GetStateInfo()
expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
if !bytes.Equal(expectedMAC, si.Channel_MAC) {
gc.logger.Warning("Message contains wrong channel MAC(", si.Channel_MAC, "), expected", expectedMAC)
return false
}
return true
}
...
}
- 通用校验,我们这里就不费功夫了,主要看stateinfo的。
- 可以看到这里主要做MAC校验,这个校验感觉没什么用,并不能保证其完整性。
消息处理
if m.IsDataMsg() || m.IsStateInfoMsg() {
added := false
if m.IsDataMsg() {
...
} else { // StateInfoMsg verification should be handled in a layer above
// since we don't have access to the id mapper here
added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
}
if added {
// Forward the message
gc.Forward(msg)
// DeMultiplex to local subscribers
gc.DeMultiplex(m)
if m.IsDataMsg() {
gc.blocksPuller.Add(msg.GetGossipMessage())
}
}
- 当然了这里会直接加到gc的stateInfoMsgStore里面存起来。当然了Add也不是那么简单。简单看看。
func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
if !cache.MessageStore.CheckValid(msg) {
return false
}
if !cache.verify(msg) {
return false
}
added := cache.MessageStore.Add(msg)
if added {
pkiID := msg.GetStateInfo().PkiId
cache.MembershipStore.Put(pkiID, msg)
}
return added
}
- CheckValid会将msg跟本地保存做比较,如果是全新的或比已有的新,会判定有效
- verify主要是校验消息发起人是否同属一个Channel,还有就是这个节点是否有读取成员状态的权力。
- 按pkiID的维度冗余一遍
- 如果Add成功,第一件事就是让其他人知道。通过emitter转发出去
- DeMultiplex是本地的一个多路分发的模块,里面可以增加订阅器,来订阅感兴趣的消息类型,进而处理。不过幸运的是,stateinfo没有人订阅,所以这里不扩散了。
使用
- 前面讲了这么多StateInfo消息,那么问题来了,这消息收下来到底干嘛用呢?举两个例子就清楚了。当然里面的Properties还有别的用处,这里先不展开。
func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
if len(peerIdentity) == 0 {
gc.logger.Warning("Identity for peer", member.PKIid, "doesn't exist")
return false
}
msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
if msg == nil {
return false
}
return true
}
- 这里如果能从stateInfoMsgStore里面找到,说明这个member同属一个通道。
// If we don't have a StateInfo message from the peer,
// no way of validating its eligibility in the channel.
if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {
gc.logger.Debug("Don't have StateInfo message of peer", msg.GetConnectionInfo())
return
}
if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
gc.logger.Warning(msg.GetConnectionInfo(), "isn't eligible for pulling blocks of", string(gc.chainID))
return
}
- 这里也是一样,上面判定同属一个通道,下面判定是否同属一个通道下的同一个组织。
总结
- StateInfo主要给别人分享是否有新的cc部署完成,是否退出通道,是否有新的block写入。
- 不管如何,第一时间转发消息给成员,利于消息扩散。
- 然后自循环,对外发布状态更新。
网友评论