美文网首页超级账本HyperLeder
Hyperledger-Fabric源码分析(Gossip-St

Hyperledger-Fabric源码分析(Gossip-St

作者: 小蜗牛爬楼梯 | 来源:发表于2020-04-09 18:54 被阅读0次

    前面我们说过了StateInfo消息, 这个主要是自发热给别人,有点细水长流的意思。下面我们要讲的是Gossip里面怎么去主动拉取成员的StateInfo列表。

    struct

    type StateInfoPullRequest struct {
        // channel_MAC is an authentication code that proves
        // that the peer that sent this message knows
        // the name of the channel.
        Channel_MAC          []byte   
    }
    
    type StateInfoSnapshot struct {
        Elements             []*Envelope 
    }
    
    type Envelope struct {
        Payload              []byte         
        Signature            []byte         
        SecretEnvelope       *SecretEnvelope
    }
    
    type SecretEnvelope struct {
        Payload              []byte 
        Signature            []byte   
    }
    
    • 这里要注意的是,这两个消息是成对的,StateInfoPullRequest是获取的请求,StateInfoSnapshot是返回的结果。

    GossipMessage_StateInfoPullReq

    初始化

    func (gc *gossipChannel) createStateInfoRequest() (*proto.SignedGossipMessage, error) {
       return (&proto.GossipMessage{
          Tag:   proto.GossipMessage_CHAN_OR_ORG,
          Nonce: 0,
          Content: &proto.GossipMessage_StateInfoPullReq{
             StateInfoPullReq: &proto.StateInfoPullRequest{
                Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
             },
          },
       }).NoopSign()
    }
    
    • 这个请求没什么好讲的,基本上就是生成MAC

    发起点

    func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
        chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
        ...
    
        // Periodically publish state info
        go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
        // Periodically request state info
        go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)
        ...
    
        go gc.membershipTracker.trackMembershipChanges()
        return gc
    }
    
    • 又到了这里,gc初始化的时候就会自循环发起StateInfoPullReq,真是不甘寂寞。边吃边拉。
    func (gc *gossipChannel) requestStateInfo() {
       req, err := gc.createStateInfoRequest()
       if err != nil {
          gc.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
          return
       }
       endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
       gc.Send(req, endpoints...)
    }
    
    • 这里不深入了,组装消息前面已经说了
    • 随机取PullPeerNum个节点,Send。这里注意Gossip中有两种发送方式,一种是Send,一种是Gossip。有区别,虽然都是通过底层comm来send,不过emitter会做一层buff来缓冲,对于不是那么急的,可以累计多个消息,一次发出去。
    • 显然这里的场景是要立即,马上发送出去。

    接受点

    func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
       ...
    
       if m.IsStateInfoPullRequestMsg() {
          msg.Respond(gc.createStateInfoSnapshot(orgID))
          return
       }
    
       ...
    }
    
    • 消息校验的部分,之前讲过了,这里不再赘述
    • 可以看到收到request后,会createStateInfoSnapshot
    • 下面进到GossipMessage_StateSnapshot的部分

    GossipMessage_StateSnapshot

    初始化

    func (gc *gossipChannel) createStateInfoSnapshot(requestersOrg api.OrgIdentityType) *proto.GossipMessage {
       sameOrg := bytes.Equal(gc.selfOrg, requestersOrg)
       rawElements := gc.stateInfoMsgStore.Get()
       elements := []*proto.Envelope{}
       for _, rawEl := range rawElements {
          msg := rawEl.(*proto.SignedGossipMessage)
          orgOfCurrentMsg := gc.GetOrgOfPeer(msg.GetStateInfo().PkiId)
          // If we're in the same org as the requester, or the message belongs to a foreign org
          // don't do any filtering
          if sameOrg || !bytes.Equal(orgOfCurrentMsg, gc.selfOrg) {
             elements = append(elements, msg.Envelope)
             continue
          }
          // Else, the requester is in a different org, so disclose only StateInfo messages that their
          // corresponding AliveMessages have external endpoints
          if netMember := gc.Lookup(msg.GetStateInfo().PkiId); netMember == nil || netMember.Endpoint == "" {
             continue
          }
          elements = append(elements, msg.Envelope)
       }
    
       return &proto.GossipMessage{
          Channel: gc.chainID,
          Tag:     proto.GossipMessage_CHAN_OR_ORG,
          Nonce:   0,
          Content: &proto.GossipMessage_StateSnapshot{
             StateSnapshot: &proto.StateInfoSnapshot{
                Elements: elements,
             },
          },
       }
    }
    
    • 首先判定同属一个Org
    • 获取stateInfoMsgStore保存的所有数据集,这里需要注意的是,这个store当初创建的时候是指定了过期策>- 略的,也就是说,里面的状态会定时清理过期的无效状态。还记得StateInfo消息是有时间戳的么。
    • 当然了,如果请求人跟本地同属一个Org,消息全部推送过去没有话讲。
    • 如果不属于一个Org,难道就不推送么?这里处于有利于消息扩散的原则,当然也要尽可能的去做
    • 如果store里面有消息跟本地不是属于同一个组织,那么推送出去也没什么不好,反正自己也不知道是谁发来的,有人要就给他好了。
    • 如果store里面有消息跟本地属于同一个组织,那么问题来了,现在来了个陌生人来请求这个组织内的状态,显然不能给他,这里就跳过了。

    发送

    func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
       sMsg, err := msg.NoopSign()
       if err != nil {
          err = errors.WithStack(err)
          m.conn.logger.Errorf("Failed creating SignedGossipMessage: %+v", err)
          return
       }
       m.conn.send(sMsg, func(e error) {}, blockingSend)
    }
    
    • 因为是Response,当然要点对点send。

    接受点

    func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
        ...
    
        if m.IsStateInfoSnapshot() {
            gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
            return
        }
        ...
    }
    
    • 又来到这里,看来这个系列,这段代码会复制粘贴N次
    • 好了gc收到后,开始处理

    处理

    func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) {
       chanName := string(gc.chainID)
       for _, envelope := range m.GetStateSnapshot().Elements {
          stateInf, err := envelope.ToGossipMessage()
          if err != nil {
             gc.logger.Warningf("Channel %s : StateInfo snapshot contains an invalid message: %+v", chanName, errors.WithStack(err))
             return
          }
          if !stateInf.IsStateInfoMsg() {
             gc.logger.Warning("Channel", chanName, ": Element of StateInfoSnapshot isn't a StateInfoMessage:",
                stateInf, "message sent from", sender)
             return
          }
          si := stateInf.GetStateInfo()
          orgID := gc.GetOrgOfPeer(si.PkiId)
          if orgID == nil {
             gc.logger.Debug("Channel", chanName, ": Couldn't find org identity of peer",
                string(si.PkiId), "message sent from", string(sender))
             return
          }
    
          if !gc.IsOrgInChannel(orgID) {
             gc.logger.Warning("Channel", chanName, ": Peer", stateInf.GetStateInfo().PkiId,
                "is not in an eligible org, can't process a stateInfo from it, sent from", sender)
             return
          }
    
          expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
          if !bytes.Equal(si.Channel_MAC, expectedMAC) {
             gc.logger.Warning("Channel", chanName, ": StateInfo message", stateInf,
                ", has an invalid MAC. Expected", expectedMAC, ", got", si.Channel_MAC, ", sent from", sender)
             return
          }
          err = gc.ValidateStateInfoMessage(stateInf)
          if err != nil {
             gc.logger.Warningf("Channel %s: Failed validating state info message: %v sent from %v : %+v", chanName, stateInf, sender, errors.WithStack(err))
             return
          }
    
          if gc.Lookup(si.PkiId) == nil {
             // Skip StateInfo messages that belong to peers
             // that have been expired
             continue
          }
    
          gc.stateInfoMsgStore.Add(stateInf)
       }
    }
    
    • 遍历收到的StateInfo消息集
    • 如果消息发起节点的组织不属于这个channel,丢弃。

    校验MAC

    • 校验消息,StateInfo篇已经讲过了。
    • 消息发起节点是否alive,如果本地都Lookup不到,也选择丢弃。
    • 下面就是久违的加到stateInfoMsgStore了。

    总结

    • 可以看到,整个机制还是比较简单,唯一要注意是,StateInfo消息保存下来的必要条件是所属的Org要同属一个Channel。

    相关文章

      网友评论

        本文标题:Hyperledger-Fabric源码分析(Gossip-St

        本文链接:https://www.haomeiwen.com/subject/yohhmhtx.html