主要是关于channel state相关内容
package gossip
type channelState struct {
stopping int32 // channel全部停止的标志值
sync.RWMutex // 读写锁
channels map[string]channel.GossipChannel // gossip channels: <channel-id, GossipChannel>
g *gossipServiceImpl // gossip service
}
// 停止所有channel
// 1、判断stop标志是否满足
// 2、首先设置stop标志值 = 1: 使用atomic的storeInt32原子操作
// 接着迭代channels的map 逐一执行channel.stop
// 关闭所有channel操作在读写锁的环境内执行
func (cs *channelState) stop() {
if cs.isStopping() {
return
}
atomic.StoreInt32(&cs.stopping, int32(1))
cs.Lock()
defer cs.Unlock()
for _, gc := range cs.channels {
gc.Stop()
}
}
// stopping 是否等于1
func (cs *channelState) isStopping() bool {
return atomic.LoadInt32(&cs.stopping) == int32(1)
}
// 根据gossip的receive message获取对应的channel
// 需要对receive message其中gossip message进行提取: 状态请求消息单独处理,主要由于该类消息内部提供了通道身份验证码和公钥ID
func (cs *channelState) lookupChannelForMsg(msg proto.ReceivedMessage) channel.GossipChannel {
if msg.GetGossipMessage().IsStateInfoPullRequestMsg() { // 状态信息获取请求消息
sipr := msg.GetGossipMessage().GetStateInfoPullReq()
mac := sipr.Channel_MAC // channel_mac:身份验证码,代表发送消息的peer知道对应的channel名称
pkiID := msg.GetConnectionInfo().ID // 根据消息获取对应的connection信息: connection_id
return cs.getGossipChannelByMAC(mac, pkiID) // 根据channel身份验证码 + 公钥ID 获取对应的gossip channel
}
return cs.lookupChannelForGossipMsg(msg.GetGossipMessage().GossipMessage) // 根据gossip消息获取对应的channel
}
// 获取gossip message对应的channel
// 主要关键点提取出Gossip Message里面的StateInfoMsg:该类信息内部包括了channel id,能够通过channel_id获取gossip channel
func (cs *channelState) lookupChannelForGossipMsg(msg *proto.GossipMessage) channel.GossipChannel {
if !msg.IsStateInfoMsg() { // 非状态信息消息
// If we reached here then the message isn't:
// 1) StateInfoPullRequest
// 2) StateInfo
// Hence, it was already sent to a peer (us) that has proved it knows the channel name, by
// sending StateInfo messages in the past.
// Therefore- we use the channel name from the message itself.
// 由于已经发送信息给对等节点 那就说明通过已经发送的状态信息消息证明是知道channel名称的
// 因此通过message本身就可以获取到对应的channel name
return cs.getGossipChannelByChainID(msg.Channel)
}
// Else, it's a StateInfo message.
stateInfMsg := msg.GetStateInfo() // 状态信息消息处理同状态请求消息
return cs.getGossipChannelByMAC(stateInfMsg.Channel_MAC, stateInfMsg.PkiId)
}
// 遍历所有的channels 寻找对应的channel的MAC计算和消息的MAC相等
// 如果是的话,对等节点签名的消息里知道channel的名字 主要是因为在消息验证时 对应的公钥ID会被检查的
// 整个遍历过程至于读锁下 保证数据的不可变
func (cs *channelState) getGossipChannelByMAC(receivedMAC []byte, pkiID common.PKIidType) channel.GossipChannel {
// Iterate over the channels, and try to find a channel that the computation
// of the MAC is equal to the MAC on the message.
// If it is, then the peer that signed the message knows the name of the channel
// because its PKI-ID was checked when the message was verified.
cs.RLock()
defer cs.RUnlock()
for chanName, gc := range cs.channels { // 遍历所有channel
mac := channel.GenerateMAC(pkiID, common.ChainID(chanName)) // 根据通道channel的名称 + 公钥 生产对应的身份验证码
if bytes.Equal(mac, receivedMAC) { // 将新生成的通道验证码与消息本身的进行比较
return gc
}
}
return nil
}
// 根据channel_id来获取gossip channel
// 当前所有的channels是不是已停止
// 根据channel id获取对应的gossip channel: channelState中map[string]*GossipChannel: <channelID, GossipChannel>
func (cs *channelState) getGossipChannelByChainID(chainID common.ChainID) channel.GossipChannel {
if cs.isStopping() {
return nil
}
cs.RLock()
defer cs.RUnlock()
return cs.channels[string(chainID)]
}
// 加入channel
// 1、首先判断对应的channels是否全部关闭
// 2、首先判断对应的channel是否已存在:channels[string(chainID)]
// 3、若存在:则直接配置组织列表定义有资格进入channel的peer
// 若不存在:a、首先获取gossip service的connection公钥:PKIid
// b、根据gossipServiceImpl + discovery构成gossipAdapterImpl
// c、在新建gossipChannel:公钥PKIid、组织selfOrgnization、消息加密服务MCS、通道IDchanId、gossipAdapter、加入信息joinMsg,并加入到指定的channel的组织列表中
// d、并将新建的gossipChannel添加到本地:channels
func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage, // 加入channel消息
chainID common.ChainID) { // 指定的channel
if cs.isStopping() {
return
}
cs.Lock()
defer cs.Unlock()
if gc, exists := cs.channels[string(chainID)]; !exists {
pkiID := cs.g.comm.GetPKIid()
ga := &gossipAdapterImpl{gossipServiceImpl: cs.g, Discovery: cs.g.disc}
gc := channel.NewGossipChannel(pkiID, cs.g.selfOrg, cs.g.mcs, chainID, ga, joinMsg)
cs.channels[string(chainID)] = gc
} else {
gc.ConfigureChannel(joinMsg)
}
}
// gossip适配器:具备gossip相关功能,以及节点发现
// 通过组合的方式使得当前定义的类型 能够完全具备内部引用类型的内容(filed和method)
// 例子:type TestA struct { // 字段
// name string
// age int
// }
//
// func (a *TestA) testA(){ // 方法
// fmt.Println("this is a TestA method!!!")
// }
//
// type combinationTestA struct { // 通过组合的方式 使得combinationTestA具备了TestA的内容
// *TestA
// }
//
// ================== 应用 =================
// ta := TestA{
// name: "Hello World",
// age: 30,
// }
//
// cta := combinationTestA{&ta}
// fmt.Println(cta.age)
// fmt.Println(cta.name)
// cta.testA()
type gossipAdapterImpl struct {
*gossipServiceImpl
discovery.Discovery
}
// 获取channel配置
func (ga *gossipAdapterImpl) GetConf() channel.Config {
return channel.Config{
ID: ga.conf.ID, // channelID = gossipID
MaxBlockCountToStore: ga.conf.MaxBlockCountToStore, // 缓存区大小
PublishStateInfoInterval: ga.conf.PublishStateInfoInterval, // 状态消息推送间隔(秒)
PullInterval: ga.conf.PullInterval, // 消息获取间隔(秒)
PullPeerNum: ga.conf.PullPeerNum, // 消息获取节点数
RequestStateInfoInterval: ga.conf.RequestStateInfoInterval, // 状态消息获取间隔(秒)
BlockExpirationInterval: ga.conf.PullInterval * 100, // 获取有效区块间隔(毫秒)
StateInfoCacheSweepInterval: ga.conf.PullInterval * 5, // 状态信息缓冲清除间隔(毫秒)
}
}
// Gossip消息签名: GossipMessage => SignedGossipMessage
// 1、定义一个signer:实现消息签名功能
// 2、构建一个SignedGossipMessage执行sign操作完成信息的签名:等到envelope
// 3、输出完整的签名gossip message(SignedGossipMessage:envelope + gossipmessage)
func (ga *gossipAdapterImpl) Sign(msg *proto.GossipMessage) (*proto.SignedGossipMessage, error) {
signer := func(msg []byte) ([]byte, error) { // 签名消息 通过消息安全服务来完成
return ga.mcs.Sign(msg)
}
sMsg := &proto.SignedGossipMessage{ // 构建签名消息
GossipMessage: msg,
}
e, err := sMsg.Sign(signer) // 将SignedGossipMessage与签名函数绑定
if err != nil {
return nil, err
}
return &proto.SignedGossipMessage{
Envelope: e,
GossipMessage: msg,
}, nil
}
// gossip传播消息(签名后的消息)
// Gossip gossips a message
// 将签名后的消息绑定对应的过滤策略形成gossip传播消息emittedGossipMessage
// 接着将emittedGossipMessage添加emitter batch中等到emit
func (ga *gossipAdapterImpl) Gossip(msg *proto.SignedGossipMessage) {
ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{ // 将emitterGossipMessage添加到batch中
SignedGossipMessage: msg,
filter: func(_ common.PKIidType) bool { // 根据公钥进行消息过滤策略:此处不做处理 直接全部允许
return true
},
})
}
// 转发消息
// 向下一级发送消息:指定签名消息SignedGossipMessage以及过滤策略filter:connection_id不相同过滤
// Forward sends message to the next hops
func (ga *gossipAdapterImpl) Forward(msg proto.ReceivedMessage) {
ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{
SignedGossipMessage: msg.GetGossipMessage(),
filter: msg.GetConnectionInfo().ID.IsNotSameFilter,
})
}
// 给指定peers发送信息
// 参数:SignedGossipMessage 签名消息
// peers 接收消息的节点
func (ga *gossipAdapterImpl) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
ga.gossipServiceImpl.comm.Send(msg, peers...)
}
// 验证状态消息StateInfoMessage: 若消息无效则返回error,否则返回nil
// ValidateStateInfoMessage returns error if a message isn't valid
// nil otherwise
func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.SignedGossipMessage) error {
return ga.gossipServiceImpl.validateStateInfoMsg(msg)
}
// 返回一个确定对等节点的组织ID 根据公钥ID:PKIid
// GetOrgOfPeer returns the organization identifier of a certain peer
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
}
// 返回一个对等节点的ID 否则返回nil 代表没有发现
// GetIdentityByPKIID returns an identity of a peer with a certain
// pkiID, or nil if not found
func (ga *gossipAdapterImpl) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
identity, err := ga.idMapper.Get(pkiID)
if err != nil {
return nil
}
return identity
}
网友评论