    基于Hyperledger Fabric 1.2的源码;



    1. peer在调用chaincode执行交易后,会依据collection配置在有权限的peer间散播私有数据,并存储私有数据到自己的transient store。
    2. 其他接收到散播私有数据的peer,会将私有数据存储到自己的transient store。


    1. peer最终接收到block后,会检查自己是否有权限获取私有数据,有权限的话从transient store中拿数据,如果没有,则去其他peer上获取,最终私有数据会存储到pvtdataStore中。


    A. 接收交易proposal


    if simResult.PvtSimulationResults != nil {
       if [cid.Name](http://cid.name/)== "lscc" {
          // TODO: remove once we can store collection configuration outside of LSCC
          return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
       // 汇总私有数据collection配置和私有数据信息
       pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txsim)
       // To read collection config need to read collection updates before
       // releasing the lock, hence txsim.Done()  moved down here
       if err != nil {
          return nil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")
       endorsedAt, err := e.s.GetLedgerHeight(chainID)
       if err != nil {
          return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", chainID))
       // Add ledger height at which transaction was endorsed,
       // `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.
       // However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and
       // manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.
       // Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.
       pvtDataWithConfig.EndorsedAt = endorsedAt
       // 在chainID为名的channel中,依据collection配置,散播私有数据
       if err := e.distributePrivateData(chainID, txid, pvtDataWithConfig, endorsedAt); err != nil {
          return nil, nil, nil, nil, err


    1. 汇总私有数据collection配置和私有数据信息(core/endorser/pvtrwset_assembler.go的AssemblePvtRWSet函数):
    // AssemblePvtRWSet prepares TxPvtReadWriteSet for distribution
    // augmenting it into TxPvtReadWriteSetWithConfigInfo adding
    // information about collections config available related
    // to private read-write set
    func (as *rwSetAssembler) AssemblePvtRWSet(privData *rwset.TxPvtReadWriteSet, txsim CollectionConfigRetriever) (*transientstore.TxPvtReadWriteSetWithConfigInfo, error) {
       txPvtRwSetWithConfig := &transientstore.TxPvtReadWriteSetWithConfigInfo{
          PvtRwset:          privData,
          CollectionConfigs: make(map[string]*common.CollectionConfigPackage),
       for _, pvtRwset := range privData.NsPvtRwset {
          namespace := pvtRwset.Namespace
          if _, found := txPvtRwSetWithConfig.CollectionConfigs[namespace]; !found {
             cb, err := txsim.GetState("lscc", privdata.BuildCollectionKVSKey(namespace))
             if err != nil {
                return nil, errors.WithMessage(err, fmt.Sprintf("error while retrieving collection config for chaincode %#v", namespace))
             if cb == nil {
                return nil, errors.New(fmt.Sprintf("no collection config for chaincode %#v", namespace))
             colCP := &common.CollectionConfigPackage{}
             err = proto.Unmarshal(cb, colCP)
             if err != nil {
                return nil, errors.Wrapf(err, "invalid configuration for collection criteria %#v", namespace)
             txPvtRwSetWithConfig.CollectionConfigs[namespace] = colCP
       return txPvtRwSetWithConfig, nil
    1. 在peer之间分发私有数据:
    privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
       return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
    serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport)


    // DistributePrivateData distribute private read write set inside the channel based on the collections policies
    func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
       // 根据channel名称获取私有数据的handler
       handler, exists := g.privateHandlers[chainID]
       if !exists {
          return errors.Errorf("No private data handler for %s", chainID)
       // 依据collection配置散播私有数据
       if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil {
          logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
          return err
       if err := handler.coordinator.StorePvtData(txID, privData, blkHt); err != nil {
          logger.Error("Failed to store private data into transient store, txID",
             txID, "channel", chainID, "due to", err)
          return err
       return nil


    // Distribute broadcast reliably private data read write set based on policies
    func (d *distributorImpl) Distribute(txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
       disseminationPlan, err := d.computeDisseminationPlan(txID, privData, blkHt)
       if err != nil {
          return errors.WithStack(err)
       return d.disseminate(disseminationPlan)

    c)存储私有数据到Transient Store(core/transientstore/store.go的PersistWithConfig函数):创建comisiteKey为键值(包括txid, uuid和blockHeight),存储私有数据,并创建两个用于后期清理的索引(compositeKeyPurgeIndexByHeight和compositeKeyPurgeIndexByTxid),数据最后存储在leveldb中,位于/var/hyperledger/production/transientStore路径下。

    B. 接收Block和私有数据


    // Listen for incoming communication
    go s.listen()
    // Deliver in order messages into the incoming channel
    go s.deliverPayloads()

    1. listen函数处理

    func (s *GossipStateProviderImpl) listen() {
        defer s.done.Done()
        for {
            select {
            case msg := <-s.gossipChan:
                logger.Debug("Received new message via gossip channel")
                go s.queueNewMessage(msg)
            case msg := <-s.commChan:
                logger.Debug("Dispatching a message", msg)
                go s.dispatch(msg)
            case <-s.stopCh:
                s.stopCh <- struct{}{}
                logger.Debug("Stop listening for new messages")




    gossipChan, _ := services.Accept(func(message interface{}) bool {
       // Get only data messages
       return message.(*proto.GossipMessage).IsDataMsg() &&
          bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
    }, false)
    // New message notification/handler
    func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {
       if !bytes.Equal(msg.Channel, []byte(s.chainID)) {
          logger.Warning("Received enqueue for channel",
             string(msg.Channel), "while expecting channel", s.chainID, "ignoring enqueue")
       dataMsg := msg.GetDataMsg()
       if dataMsg != nil {
          if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil {
             logger.Warning("Failed adding payload:", err)
          logger.Debugf("Received new payload with sequence number = [%d]", dataMsg.Payload.SeqNum)
       } else {
          logger.Debug("Gossip message received is not of data message type, usually this should not happen.")



    该通道的消息是经过remoteStateMsgFilter筛选的(主要检查消息内容,channel权限),协程调用dispatch函数(gossip/state/state.go的dispatch函数),分别处理state message和私有数据信息。这里主要关注私有数据,如果获得私有数据,则进行处理(gossip/state/state.go的privateDataMessage函数):
    c)写入Transient Store(最终还是调用core/transientstore/store.go的PersistWithConfig函数);

    remoteStateMsgFilter := func(message interface{}) bool {
       receivedMsg := message.(proto.ReceivedMessage)
       msg := receivedMsg.GetGossipMessage()
       if !(msg.IsRemoteStateMessage() || msg.GetPrivateData() != nil) {
          return false
       // Ensure we deal only with messages that belong to this channel
       if !bytes.Equal(msg.Channel, []byte(chainID)) {
          return false
       connInfo := receivedMsg.GetConnectionInfo()
       authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
       if authErr != nil {
          logger.Warning("Got unauthorized request from", string(connInfo.Identity))
          return false
       return true
    // Filter message which are only relevant for nodeMetastate transfer
    _, commChan := services.Accept(remoteStateMsgFilter, true)
    func (s *GossipStateProviderImpl) dispatch(msg proto.ReceivedMessage) {
       // Check type of the message
       if msg.GetGossipMessage().IsRemoteStateMessage() {
          logger.Debug("Handling direct state transfer message")
          // Got state transfer request response
       } else if msg.GetGossipMessage().GetPrivateData() != nil {
          logger.Debug("Handling private data collection message")
          // Handling private data replication message

    2. deliverPayloads函数处理

    func (s *GossipStateProviderImpl) deliverPayloads() {
        defer s.done.Done()
        for {
            select {
            // Wait for notification that next seq has arrived
            case <-s.payloads.Ready():
                logger.Debugf("Ready to transfer payloads to the ledger, next sequence number is = [%d]", s.payloads.Next())
                // Collect all subsequent payloads
                for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
                    rawBlock := &common.Block{}
                    if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
                        logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
                    if rawBlock.Data == nil || rawBlock.Header == nil {
                        logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
                            payload.SeqNum, rawBlock.Header, rawBlock.Data)
                    logger.Debug("New block with claimed sequence number ", payload.SeqNum, " transactions num ", len(rawBlock.Data.Data))
                    // Read all private data into slice
                    var p util.PvtDataCollections
                    if payload.PrivateData != nil {
                        err := p.Unmarshal(payload.PrivateData)
                        if err != nil {
                            logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
                    if err := s.commitBlock(rawBlock, p); err != nil {
                        if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
                            logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
                        logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
            case <-s.stopCh:
                s.stopCh <- struct{}{}
                logger.Debug("State provider has been stopped, finishing to push new blocks.")



    // Validate performs the validation of a block. The validation
    // of each transaction in the block is performed in parallel.
    // The approach is as follows: the committer thread starts the
    // tx validation function in a goroutine (using a semaphore to cap
    // the number of concurrent validating goroutines). The committer
    // thread then reads results of validation (in orderer of completion
    // of the goroutines) from the results channel. The goroutines
    // perform the validation of the txs in the block and enqueue the
    // validation result in the results channel. A few note-worthy facts:
    // 1) to keep the approach simple, the committer thread enqueues
    //    all transactions in the block and then moves on to reading the
    //    results.
    // 2) for parallel validation to work, it is important that the
    //    validation function does not change the state of the system.
    //    Otherwise the order in which validation is perform matters
    //    and we have to resort to sequential validation (or some locking).
    //    This is currently true, because the only function that affects
    //    state is when a config transaction is received, but they are
    //    guaranteed to be alone in the block. If/when this assumption
    //    is violated, this code must be changed.
    func (v *TxValidator) Validate(block *common.Block) error {

    4)检查缺少的私有数据,从自己的transient store中获取(gossip/privdata/coordinator.go的listMissingPrivateData函数):
      b)从Transient Store中获取缺少的私有数据(gossip/privdata/coordinator.go的fetchMissingFromTransientStore函数—>fetchFromTransientStore函数);
    5)在指定的时间(peer.gossip.pvtData.pullRetryThreshold)内,从其他peer获取缺少的私有数据(gossip/privdata/coordinator.go的fetchFromPeers函数),存入Transient Store中;


    // CommitWithPvtData commits blocks atomically with private data
    func (lc *LedgerCommitter) CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvtData) error {
       // Do validation and whatever needed before
       // committing new block
       if err := lc.preCommit(blockAndPvtData.Block); err != nil {
          return err
       // Committing new block
       if err := lc.PeerLedgerSupport.CommitWithPvtData(blockAndPvtData); err != nil {
          return err
       // post commit actions, such as event publishing
       return nil

      a)如果是config block,调用系统合约cscc升级配置(core/committer/committer_impl.go的preCommit函数);

    // CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
    func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) error {
       var err error
       block := pvtdataAndBlock.Block
       blockNo := pvtdataAndBlock.Block.Header.Number
       logger.Debugf("Channel [%s]: Validating state for block [%d]", l.ledgerID, blockNo)
       // 检查和准备工作
       err = l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)
       if err != nil {
          return err
       logger.Debugf("Channel [%s]: Committing block [%d] to storage", l.ledgerID, blockNo)
       defer l.blockAPIsRWLock.Unlock()
       // 提交区块和私密数据
       if err = l.blockStore.CommitWithPvtData(pvtdataAndBlock); err != nil {
          return err
       logger.Infof("Channel [%s]: Committed block [%d] with %d transaction(s)", l.ledgerID, block.Header.Number, len(block.Data.Data))
       if utils.IsConfigBlock(block) {
          if err := l.WriteConfigBlockToSpecFile(block); err != nil {
             logger.Errorf("Failed to write config block to file for %s", err)
       logger.Debugf("Channel [%s]: Committing block [%d] transactions to state database", l.ledgerID, blockNo)
       // 提交stateDB
       if err = l.txtmgmt.Commit(); err != nil {
          panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
       // History database could be written in parallel with state and/or async as a future optimization
       if ledgerconfig.IsHistoryDBEnabled() {
          logger.Debugf("Channel [%s]: Committing block [%d] transactions to history database", l.ledgerID, blockNo)
          // 提交historyDB
          if err := l.historyDB.Commit(block); err != nil {
             panic(fmt.Errorf(`Error during commit to history db:%s`, err))
       return nil
    // validateAndPreparePvtBatch pulls out the private write-set for the transactions that are marked as valid
    // by the internal public data validator. Finally, it validates (if not already self-endorsed) the pvt rwset against the
    // corresponding hash present in the public rwset
    func validateAndPreparePvtBatch(block *valinternal.Block, pvtdata map[uint64]*ledger.TxPvtData) (*privacyenabledstate.PvtUpdateBatch, error) {

        i)检查和准备工作(core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go的ValidateAndPrepare函数):等待pvtdata清理完毕(core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go的WaitForPrepareToFinish函数),检查和准备batch(core/ledger/kvledger/txmgmt/validator/valimpl/default_impl.go的ValidateAndPrepareBatch函数),主要关注私密数据的检查,排除没有权限的数据,验证hash正确性;开启state change监听;



