美文网首页
Hyperledger-Fabric源码分析(加入通道)

Hyperledger-Fabric源码分析(加入通道)

作者: Pillar_Zhong | 来源:发表于2019-04-08 23:24 被阅读0次

    前一篇讲完通道创建,接下来马上趁热打铁来看下加入通道这部分的实现,先看下命令。

    peer channel join -b mychannel.block

    可以看到,这里用上了上一步所生成的通道的genesisblock。

    peer

    executeJoin

    func executeJoin(cf *ChannelCmdFactory) (err error) {
       spec, err := getJoinCCSpec()
       if err != nil {
          return err
       }
    
       // Build the ChaincodeInvocationSpec message
       invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
    
       creator, err := cf.Signer.Serialize()
       if err != nil {
          return fmt.Errorf("Error serializing identity for %s: %s", cf.Signer.GetIdentifier(), err)
       }
    
       var prop *pb.Proposal
       prop, _, err = putils.CreateProposalFromCIS(pcommon.HeaderType_CONFIG, "", invocation, creator)
       if err != nil {
          return fmt.Errorf("Error creating proposal for join %s", err)
       }
    
       var signedProp *pb.SignedProposal
       signedProp, err = putils.GetSignedProposal(prop, cf.Signer)
       if err != nil {
          return fmt.Errorf("Error creating signed proposal %s", err)
       }
    
       var proposalResp *pb.ProposalResponse
       proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp)
       if err != nil {
          return ProposalFailedErr(err.Error())
       }
    
       if proposalResp == nil {
          return ProposalFailedErr("nil proposal response")
       }
    
       if proposalResp.Response.Status != 0 && proposalResp.Response.Status != 200 {
          return ProposalFailedErr(fmt.Sprintf("bad proposal response %d: %s", proposalResp.Response.Status, proposalResp.Response.Message))
       }
       logger.Info("Successfully submitted proposal to join channel")
       return nil
    }
    

    这里主要做几件事情

    • 生成cis,也就是ChaincodeInvocationSpec,这里跟chaincode invoke篇高度相似,这里就不再赘述。而这里cis的重点是里面最终调用的是什么地方。

      • 可以看到最终是调用cscc的JoinChain
    func getJoinCCSpec() (*pb.ChaincodeSpec, error) {
      if genesisBlockPath == common.UndefinedParamValue {
         return nil, errors.New("Must supply genesis block file")
      }
    
      gb, err := ioutil.ReadFile(genesisBlockPath)
      if err != nil {
         return nil, GBFileNotFoundErr(err.Error())
      }
      // Build the spec
      input := &pb.ChaincodeInput{Args: [][]byte{[]byte(cscc.JoinChain), gb}}
    
      spec := &pb.ChaincodeSpec{
         Type:        pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]),
         ChaincodeId: &pb.ChaincodeID{Name: "cscc"},
         Input:       input,
      }
    
      return spec, nil
    }
    
    • 最后包装成HeaderType_CONFIG的Proposal
    • 注意,这里会去生成数字签名,当然是用的signidentity,而签名身份是跟admin是一致的。
    • 接着就是开始处理proposal了

    cscc

    case JoinChain:
       if args[1] == nil {
          return shim.Error("Cannot join the channel <nil> configuration block provided")
       }
    
       block, err := utils.GetBlockFromBlockBytes(args[1])
       if err != nil {
          return shim.Error(fmt.Sprintf("Failed to reconstruct the genesis block, %s", err))
       }
    
       cid, err := utils.GetChainIDFromBlock(block)
       if err != nil {
          return shim.Error(fmt.Sprintf("\"JoinChain\" request failed to extract "+
             "channel id from the block due to [%s]", err))
       }
    
       if err := validateConfigBlock(block); err != nil {
          return shim.Error(fmt.Sprintf("\"JoinChain\" for chainID = %s failed because of validation "+
             "of configuration block, because of %s", cid, err))
       }
    
       // 2. check local MSP Admins policy
       // TODO: move to ACLProvider once it will support chainless ACLs
       if err = e.policyChecker.CheckPolicyNoChannel(mgmt.Admins, sp); err != nil {
          return shim.Error(fmt.Sprintf("access denied for [%s][%s]: [%s]", fname, cid, err))
       }
    
       // Initialize txsFilter if it does not yet exist. We can do this safely since
       // it's the genesis block anyway
       txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
       if len(txsFilter) == 0 {
          // add array of validation code hardcoded to valid
          txsFilter = util.NewTxValidationFlagsSetValue(len(block.Data.Data), pb.TxValidationCode_VALID)
          block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
       }
    
       return joinChain(cid, block, e.ccp, e.sccp)
    
    • 前面就是block的各种校验,跳过

    • CheckPolicyNoChannel(mgmt.Admins, sp),为什么前面要强调签名身份跟admin一致,说明这个proposal包是admin签署的。而这里就是校验规则是否是admin。截取片段证明我所言非虚。

      case m.MSPRole_ADMIN:
         mspLogger.Debugf("Checking if identity satisfies ADMIN role for %s", msp.name)
         // in the case of admin, we check that the
         // id is exactly one of our admins
         for _, admincert := range msp.admins {
            if bytes.Equal(id.(*identity).cert.Raw, admincert.(*identity).cert.Raw) {
               // we do not need to check whether the admin is a valid identity
               // according to this MSP, since we already check this at Setup time
               // if there is a match, we can just return
               return nil
            }
         }
      
      • 可以看到这里就是比对证书是否跟本地保存的该组织的admin是否一致。

    joinChain

    func joinChain(chainID string, block *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) pb.Response {
       if err := peer.CreateChainFromBlock(block, ccp, sccp); err != nil {
          return shim.Error(err.Error())
       }
    
       peer.InitChain(chainID)
    
       return shim.Success(nil)
    }
    

    这里分两步,CreateChainFromBlock和InitChain下面会讲到。处理完后给peer返回success

    CreateChainFromBlock

    func CreateChainFromBlock(cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) error {
       cid, err := utils.GetChainIDFromBlock(cb)
       if err != nil {
          return err
       }
    
       var l ledger.PeerLedger
       if l, err = ledgermgmt.CreateLedger(cb); err != nil {
          return errors.WithMessage(err, "cannot create ledger from genesis block")
       }
    
       return createChain(cid, l, cb, ccp, sccp, pluginMapper)
    }
    

    这里做两件事情

    • 创建本地帐本
    • createChain,回忆下上篇创建通道,那里会初始化orderer端的chain,并启动。而这里是peer端

    createChain

    func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider, pm txvalidator.PluginMapper) error {
       chanConf, err := retrievePersistedChannelConfig(ledger)
       if err != nil {
          return err
       }
    
       var bundle *channelconfig.Bundle
    
       if chanConf != nil {
          bundle, err = channelconfig.NewBundle(cid, chanConf)
          if err != nil {
             return err
          }
       } else {
          // Config was only stored in the statedb starting with v1.1 binaries
          // so if the config is not found there, extract it manually from the config block
          envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
          if err != nil {
             return err
          }
    
          bundle, err = channelconfig.NewBundleFromEnvelope(envelopeConfig)
          if err != nil {
             return err
          }
       }
    
       capabilitiesSupportedOrPanic(bundle)
    
       channelconfig.LogSanityChecks(bundle)
    
       gossipEventer := service.GetGossipService().NewConfigEventer()
    
       gossipCallbackWrapper := func(bundle *channelconfig.Bundle) {
          ac, ok := bundle.ApplicationConfig()
          if !ok {
             // TODO, handle a missing ApplicationConfig more gracefully
             ac = nil
          }
          gossipEventer.ProcessConfigUpdate(&gossipSupport{
             Validator:   bundle.ConfigtxValidator(),
             Application: ac,
             Channel:     bundle.ChannelConfig(),
          })
          service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
             // TODO: this is a place-holder that would somehow make the MSP layer suspect
             // that a given certificate is revoked, or its intermediate CA is revoked.
             // In the meantime, before we have such an ability, we return true in order
             // to suspect ALL identities in order to validate all of them.
             return true
          })
       }
    
       trustedRootsCallbackWrapper := func(bundle *channelconfig.Bundle) {
          updateTrustedRoots(bundle)
       }
    
       mspCallback := func(bundle *channelconfig.Bundle) {
          // TODO remove once all references to mspmgmt are gone from peer code
          mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager())
       }
    
       ac, ok := bundle.ApplicationConfig()
       if !ok {
          ac = nil
       }
    
       cs := &chainSupport{
          Application: ac, // TODO, refactor as this is accessible through Manager
          ledger:      ledger,
       }
    
       peerSingletonCallback := func(bundle *channelconfig.Bundle) {
          ac, ok := bundle.ApplicationConfig()
          if !ok {
             ac = nil
          }
          cs.Application = ac
          cs.Resources = bundle
       }
    
       cs.bundleSource = channelconfig.NewBundleSource(
          bundle,
          gossipCallbackWrapper,
          trustedRootsCallbackWrapper,
          mspCallback,
          peerSingletonCallback,
       )
    
       vcs := struct {
          *chainSupport
          *semaphore.Weighted
       }{cs, validationWorkersSemaphore}
       validator := txvalidator.NewTxValidator(cid, vcs, sccp, pm)
       c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
          chainID, err := utils.GetChainIDFromBlock(block)
          if err != nil {
             return err
          }
          return SetCurrConfigBlock(block, chainID)
       })
    
       ordererAddresses := bundle.ChannelConfig().OrdererAddresses()
       if len(ordererAddresses) == 0 {
          return errors.New("no ordering service endpoint provided in configuration block")
       }
    
       // TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer?
       store, err := TransientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())
       if err != nil {
          return errors.Wrapf(err, "[channel %s] failed opening transient store", bundle.ConfigtxValidator().ChainID())
       }
       csStoreSupport := &CollectionSupport{
          PeerLedger: ledger,
       }
       simpleCollectionStore := privdata.NewSimpleCollectionStore(csStoreSupport)
    
       service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
          Validator:            validator,
          Committer:            c,
          Store:                store,
          Cs:                   simpleCollectionStore,
          IdDeserializeFactory: csStoreSupport,
       })
    
       chains.Lock()
       defer chains.Unlock()
       chains.list[cid] = &chain{
          cs:        cs,
          cb:        cb,
          committer: c,
       }
    
       return nil
    }
    
    • 首先根据传入的genesisblock中拿到envelope,进而转换成bundle,又来了,看过上篇的,应该很熟悉了。

    • 拿到bundle后,第一时间就是组装chainsupport,可以看到后面的wrapper和callback都是用来处理当bundle更新的场景,也就是当配置变更的时候会回调这些逻辑,这里就不展开了。自行分析吧。

    cs := &chainSupport{
          Application: ac, // TODO, refactor as this is accessible through Manager
          ledger:      ledger,
    }
    
    cs.bundleSource = channelconfig.NewBundleSource(
      bundle,
      gossipCallbackWrapper,
      trustedRootsCallbackWrapper,
      mspCallback,
      peerSingletonCallback,
    )
    
    • 然后就是根据账本去初始化LedgerCommitter,这是账本提交相关的。

    • 最后最最重要的是初始化gossip服务,回忆下gossip会做什么?

      • 去orderer拉取block
      • peer节点间选举
      • peer成员间主动被动同步block
      • peer成员状态同步
    • 而这些都会在这里面去初始化,service.GetGossipService().InitializeChannel感兴趣的可以进去看看,可以找到前面所讲的deliveryService,leaderelection,GossipStateProvider

    InitChain

    peer.Initialize(func(cid string) {
       logger.Debugf("Deploying system CC, for channel <%s>", cid)
       sccp.DeploySysCCs(cid, ccp)
       sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
          return peer.GetLedger(cid).NewQueryExecutor()
       }))
       if err != nil {
          logger.Panicf("Failed subscribing to chaincode lifecycle updates")
       }
       cceventmgmt.GetMgr().Register(cid, sub)
    }
    
    • 当peer node start的时候也会调用sccp.DeploySysCCs(cid, ccp),只不过cid为空,说明是系统级的scc,而这里部署的是通道相关的scc。也就是说以后在通道的基础上调用的scc就是该通道独有的,跟其他通道以及系统区分开。

    • 好奇,到底有什么区别,都用系统级的不好么?进去看看

    deploySysCC

    func deploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error {
       ...
    
       if chainID != "" {
          lgr := peer.GetLedger(chainID)
          if lgr == nil {
             panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID))
          }
    
          txsim, err := lgr.NewTxSimulator(txid)
          if err != nil {
             return err
          }
    
          txParams.TXSimulator = txsim
          defer txsim.Done()
       }
    
       ...
    }
    

    进来可以看到,如果绑定通道id的结果是生成独有的事件模拟器。

    小结

    至此,整个join处理完毕,很多部分前面已经讲过了,显得这里很粗糙,建议搞清楚了再来,抱歉。

    相关文章

      网友评论

          本文标题:Hyperledger-Fabric源码分析(加入通道)

          本文链接:https://www.haomeiwen.com/subject/ysomiqtx.html