美文网首页区块链-HyperLedger Fabric
HyperLedgerFabric源码解读(2)-chanSta

HyperLedgerFabric源码解读(2)-chanSta

作者: 神奇的考拉 | 来源:发表于2018-11-19 17:15 被阅读3次

主要是关于channel state相关内容

package gossip

type channelState struct {
    stopping int32                              // channel全部停止的标志值
    sync.RWMutex                                // 读写锁
    channels map[string]channel.GossipChannel  //  gossip channels: <channel-id, GossipChannel>
    g        *gossipServiceImpl                // gossip service
}
// 停止所有channel
// 1、判断stop标志是否满足
// 2、首先设置stop标志值 = 1: 使用atomic的storeInt32原子操作
//    接着迭代channels的map 逐一执行channel.stop
//    关闭所有channel操作在读写锁的环境内执行
func (cs *channelState) stop() {
    if cs.isStopping() {
        return
    }
    atomic.StoreInt32(&cs.stopping, int32(1))
    cs.Lock()
    defer cs.Unlock()
    for _, gc := range cs.channels {
        gc.Stop()
    }
}

// stopping 是否等于1 
func (cs *channelState) isStopping() bool {
    return atomic.LoadInt32(&cs.stopping) == int32(1)
}
// 根据gossip的receive message获取对应的channel
// 需要对receive message其中gossip message进行提取: 状态请求消息单独处理,主要由于该类消息内部提供了通道身份验证码和公钥ID
func (cs *channelState) lookupChannelForMsg(msg proto.ReceivedMessage) channel.GossipChannel {
    if msg.GetGossipMessage().IsStateInfoPullRequestMsg() {        // 状态信息获取请求消息
        sipr := msg.GetGossipMessage().GetStateInfoPullReq()
        mac := sipr.Channel_MAC                                    // channel_mac:身份验证码,代表发送消息的peer知道对应的channel名称
        pkiID := msg.GetConnectionInfo().ID                        // 根据消息获取对应的connection信息: connection_id
        return cs.getGossipChannelByMAC(mac, pkiID)               // 根据channel身份验证码 + 公钥ID 获取对应的gossip channel
    }
    return cs.lookupChannelForGossipMsg(msg.GetGossipMessage().GossipMessage) // 根据gossip消息获取对应的channel
}
// 获取gossip message对应的channel
// 主要关键点提取出Gossip Message里面的StateInfoMsg:该类信息内部包括了channel id,能够通过channel_id获取gossip channel
func (cs *channelState) lookupChannelForGossipMsg(msg *proto.GossipMessage) channel.GossipChannel {
    if !msg.IsStateInfoMsg() {  // 非状态信息消息
        // If we reached here then the message isn't:
        // 1) StateInfoPullRequest
        // 2) StateInfo
        // Hence, it was already sent to a peer (us) that has proved it knows the channel name, by
        // sending StateInfo messages in the past.
        // Therefore- we use the channel name from the message itself.
        // 由于已经发送信息给对等节点 那就说明通过已经发送的状态信息消息证明是知道channel名称的
        // 因此通过message本身就可以获取到对应的channel name
        return cs.getGossipChannelByChainID(msg.Channel)
    }

    // Else, it's a StateInfo message.
    stateInfMsg := msg.GetStateInfo() // 状态信息消息处理同状态请求消息
    return cs.getGossipChannelByMAC(stateInfMsg.Channel_MAC, stateInfMsg.PkiId)
}
// 遍历所有的channels 寻找对应的channel的MAC计算和消息的MAC相等
// 如果是的话,对等节点签名的消息里知道channel的名字 主要是因为在消息验证时 对应的公钥ID会被检查的
// 整个遍历过程至于读锁下 保证数据的不可变
func (cs *channelState) getGossipChannelByMAC(receivedMAC []byte, pkiID common.PKIidType) channel.GossipChannel {
    // Iterate over the channels, and try to find a channel that the computation
    // of the MAC is equal to the MAC on the message.
    // If it is, then the peer that signed the message knows the name of the channel
    // because its PKI-ID was checked when the message was verified.
    cs.RLock()
    defer cs.RUnlock()
    for chanName, gc := range cs.channels {                               // 遍历所有channel
        mac := channel.GenerateMAC(pkiID, common.ChainID(chanName))        // 根据通道channel的名称 + 公钥 生产对应的身份验证码
        if bytes.Equal(mac, receivedMAC) {                                 // 将新生成的通道验证码与消息本身的进行比较
            return gc
        }
    }
    return nil
}
// 根据channel_id来获取gossip channel
// 当前所有的channels是不是已停止
// 根据channel id获取对应的gossip channel: channelState中map[string]*GossipChannel: <channelID, GossipChannel>
func (cs *channelState) getGossipChannelByChainID(chainID common.ChainID) channel.GossipChannel {
    if cs.isStopping() {
        return nil
    }
    cs.RLock()
    defer cs.RUnlock()
    return cs.channels[string(chainID)]
}
// 加入channel
// 1、首先判断对应的channels是否全部关闭
// 2、首先判断对应的channel是否已存在:channels[string(chainID)]
// 3、若存在:则直接配置组织列表定义有资格进入channel的peer
//    若不存在:a、首先获取gossip service的connection公钥:PKIid
//              b、根据gossipServiceImpl + discovery构成gossipAdapterImpl
//              c、在新建gossipChannel:公钥PKIid、组织selfOrgnization、消息加密服务MCS、通道IDchanId、gossipAdapter、加入信息joinMsg,并加入到指定的channel的组织列表中
//              d、并将新建的gossipChannel添加到本地:channels
func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage,   // 加入channel消息
    chainID common.ChainID) {                                         // 指定的channel
    if cs.isStopping() {
        return
    }
    cs.Lock()
    defer cs.Unlock()
    if gc, exists := cs.channels[string(chainID)]; !exists {
        pkiID := cs.g.comm.GetPKIid()
        ga := &gossipAdapterImpl{gossipServiceImpl: cs.g, Discovery: cs.g.disc}
        gc := channel.NewGossipChannel(pkiID, cs.g.selfOrg, cs.g.mcs, chainID, ga, joinMsg)
        cs.channels[string(chainID)] = gc
    } else {
        gc.ConfigureChannel(joinMsg)
    }
}
// gossip适配器:具备gossip相关功能,以及节点发现
// 通过组合的方式使得当前定义的类型 能够完全具备内部引用类型的内容(filed和method)
// 例子:type TestA struct {  // 字段
//              name string
//              age int
//      }
//
//      func (a *TestA) testA(){  // 方法
//          fmt.Println("this is a TestA method!!!")
//      }
//
//      type combinationTestA struct { // 通过组合的方式 使得combinationTestA具备了TestA的内容
//          *TestA
//      }
//
// ================== 应用 =================
//      ta := TestA{
//          name: "Hello World",
//          age: 30,
//      }
//
//      cta := combinationTestA{&ta}
//      fmt.Println(cta.age)
//      fmt.Println(cta.name)
//      cta.testA()

type gossipAdapterImpl struct {
    *gossipServiceImpl
    discovery.Discovery
}
// 获取channel配置
func (ga *gossipAdapterImpl) GetConf() channel.Config {
    return channel.Config{
        ID:                          ga.conf.ID,                         // channelID = gossipID
        MaxBlockCountToStore:        ga.conf.MaxBlockCountToStore,       // 缓存区大小
        PublishStateInfoInterval:    ga.conf.PublishStateInfoInterval,   // 状态消息推送间隔(秒)
        PullInterval:                ga.conf.PullInterval,               // 消息获取间隔(秒)
        PullPeerNum:                 ga.conf.PullPeerNum,                // 消息获取节点数
        RequestStateInfoInterval:    ga.conf.RequestStateInfoInterval,   // 状态消息获取间隔(秒)
        BlockExpirationInterval:     ga.conf.PullInterval * 100,         // 获取有效区块间隔(毫秒)
        StateInfoCacheSweepInterval: ga.conf.PullInterval * 5,           // 状态信息缓冲清除间隔(毫秒)
    }
}
// Gossip消息签名: GossipMessage => SignedGossipMessage
// 1、定义一个signer:实现消息签名功能
// 2、构建一个SignedGossipMessage执行sign操作完成信息的签名:等到envelope
// 3、输出完整的签名gossip message(SignedGossipMessage:envelope + gossipmessage)
func (ga *gossipAdapterImpl) Sign(msg *proto.GossipMessage) (*proto.SignedGossipMessage, error) {
    signer := func(msg []byte) ([]byte, error) {   // 签名消息 通过消息安全服务来完成
        return ga.mcs.Sign(msg)
    }
    sMsg := &proto.SignedGossipMessage{            // 构建签名消息
        GossipMessage: msg,
    }
    e, err := sMsg.Sign(signer)                    // 将SignedGossipMessage与签名函数绑定
    if err != nil {
        return nil, err
    }
    return &proto.SignedGossipMessage{
        Envelope:      e,
        GossipMessage: msg,
    }, nil
}
// gossip传播消息(签名后的消息)
// Gossip gossips a message
// 将签名后的消息绑定对应的过滤策略形成gossip传播消息emittedGossipMessage
// 接着将emittedGossipMessage添加emitter batch中等到emit
func (ga *gossipAdapterImpl) Gossip(msg *proto.SignedGossipMessage) {
    ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{  // 将emitterGossipMessage添加到batch中
        SignedGossipMessage: msg,
        filter: func(_ common.PKIidType) bool {  // 根据公钥进行消息过滤策略:此处不做处理 直接全部允许
            return true
        },
    })
}
// 转发消息
// 向下一级发送消息:指定签名消息SignedGossipMessage以及过滤策略filter:connection_id不相同过滤
// Forward sends message to the next hops
func (ga *gossipAdapterImpl) Forward(msg proto.ReceivedMessage) {
    ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{
        SignedGossipMessage: msg.GetGossipMessage(),
        filter:              msg.GetConnectionInfo().ID.IsNotSameFilter,
    })
}
// 给指定peers发送信息
// 参数:SignedGossipMessage 签名消息
//       peers 接收消息的节点
func (ga *gossipAdapterImpl) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
    ga.gossipServiceImpl.comm.Send(msg, peers...)
}
// 验证状态消息StateInfoMessage: 若消息无效则返回error,否则返回nil
// ValidateStateInfoMessage returns error if a message isn't valid
// nil otherwise
func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.SignedGossipMessage) error {
    return ga.gossipServiceImpl.validateStateInfoMsg(msg)
}
// 返回一个确定对等节点的组织ID 根据公钥ID:PKIid
// GetOrgOfPeer returns the organization identifier of a certain peer
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
    return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
}
// 返回一个对等节点的ID  否则返回nil 代表没有发现
// GetIdentityByPKIID returns an identity of a peer with a certain
// pkiID, or nil if not found
func (ga *gossipAdapterImpl) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
    identity, err := ga.idMapper.Get(pkiID)
    if err != nil {
        return nil
    }
    return identity
}

相关文章

网友评论

    本文标题:HyperLedgerFabric源码解读(2)-chanSta

    本文链接:https://www.haomeiwen.com/subject/deyafqtx.html