美文网首页
长安链源码学习--区块验证(BlockVerify)(六)

长安链源码学习--区块验证(BlockVerify)(六)

作者: 明神特烦恼 | 来源:发表于2021-06-15 13:29 被阅读0次

    作者:明神特烦恼
    公众号:明神特烦恼

    当节点接收到其他节点发送的提案信息 或者 接收到其他节点同步过来的区块,需要对区块进行验证,确保区块的有效性。

    待验证区块的数据结构:

    type Block struct {
        // header of the block
        Header *BlockHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
        // execution sequence of intra block transactions is generated by proposer
        Dag *DAG           `protobuf:"bytes,2,opt,name=dag,proto3" json:"dag,omitempty"`
        Txs []*Transaction `protobuf:"bytes,3,rep,name=txs,proto3" json:"txs,omitempty"`
        // stores the voting information of the current block
        // not included in block hash value calculation
        AdditionalData *AdditionalData `protobuf:"bytes,4,opt,name=additional_data,json=additionalData,proto3" json:"additional_data,omitempty"`
    }
    

    区块验证内容如下:
    1)区块中交易总数 不能超过 配置的区块最大交易数限制
    2)区块体的交易总数与Header.TxCount相同
    3)本节点的区块高度 >= 验证区块的高度,且能否与前一区块的Hash一致
    4)提案人身份认证有效、权限检测通过(权限管理在后面章节单独介绍)
    5)区块内部txid无重复
    6)通过DAG生成读写集,每笔交易的读写集Hash与tx.Result.RwSetHash一致
    7)每笔交易执行结果与区块内tx.Result一致

    其中第6步与第7步都为多线程并发验证,这里体现出DAG的作用。

    type DAG struct {
        // sequence number of transaction topological sort
        //the sequence number of the transaction topological sort associated with the transaction
        Vertexes []*DAG_Neighbor `protobuf:"bytes,2,rep,name=vertexes,proto3" json:"vertexes,omitempty"`
    }
    

    之前的章节,我们分析过DAG的数据结构,Vertexes的每个索引表示具体的交易编号。这里处理的思路是,如果某个节点与其他节点不是邻居,代表没有关联交易,可以并发生成读写集。这样可以加快读写集的生成。具体逻辑在SimulateWithDag函数中。

    func (ts *TxSchedulerImpl) SimulateWithDag(block *commonpb.Block, snapshot protocol.Snapshot) (map[string]*commonpb.TxRWSet, map[string]*commonpb.Result, error) {
        txMapping := make(map[int]*commonpb.Transaction)
        for index, tx := range block.Txs {
            txMapping[index] = tx
        }
    
        // 交易间关联关系转化,记录每笔交易与哪些交易有关联关系,用true表示
        dag := block.Dag
        dagRemain := make(map[int]dagNeighbors)
        for txIndex, neighbors := range dag.Vertexes {
            dn := make(dagNeighbors)
            for _, neighbor := range neighbors.Neighbors {
                dn[int(neighbor)] = true
            }
            dagRemain[txIndex] = dn
        }
    
    
        txBatchSize := len(block.Dag.Vertexes)
        runningTxC := make(chan int, txBatchSize)
        doneTxC := make(chan int, txBatchSize)
    
        timeoutC := time.After(ScheduleWithDagTimeout * time.Second)
        finishC := make(chan bool)
        var goRoutinePool *ants.Pool
        var err error
        poolCapacity := runtime.NumCPU() * 4
        if goRoutinePool, err = ants.NewPool(poolCapacity, ants.WithPreAlloc(true)); err != nil {
            return nil, nil, err
        }
        defer goRoutinePool.Release()
     // 启动校验线程池,准备并发生成读写集
        go func() {
            for {
                select {
                case txIndex := <-runningTxC:
                    tx := txMapping[txIndex]
                    err := goRoutinePool.Submit(func() {
                        ts.log.Debugf("run vm with dag for tx id %s", tx.Header.GetTxId())
                        txSimContext := newTxSimContext(ts.VmManager, snapshot, tx)
                        runVmSuccess := true
                        var txResult *commonpb.Result
                        var err error
                        //调用合约模拟交易执行
                        if txResult, err = ts.runVM(tx, txSimContext); err != nil {
                            runVmSuccess = false
                            txSimContext.SetTxResult(txResult)
                            ts.log.Errorf("failed to run vm for tx id:%s during simulate with dag, tx result:%+v, error:%+v", tx.Header.GetTxId(), txResult, err)
                        } else {
                            //ts.log.Debugf("success to run vm for tx id:%s during simulate with dag, tx result:%+v", tx.Header.GetTxId(), txResult)
                            txSimContext.SetTxResult(txResult)
                        }
    
                        applyResult, applySize := snapshot.ApplyTxSimContext(txSimContext, runVmSuccess)
                        if !applyResult {
                            ts.log.Debugf("failed to apply according to dag with tx %s ", tx.Header.TxId)
                            runningTxC <- txIndex
                        } else {
                            ts.log.Debugf("apply to snapshot tx id:%s, result:%+v, apply count:%d", tx.Header.GetTxId(), txResult, applySize)
                            doneTxC <- txIndex //交易执行成功
                        }
                        // If all transactions in current batch have been successfully added to dag
                        if applySize >= txBatchSize {
                            finishC <- true
                        }
                    })
                    if err != nil {
                        ts.log.Warnf("failed to submit tx id %s during simulate with dag, %+v", tx.Header.GetTxId(), err)
                    }
                case doneTxIndex := <-doneTxC:
                    ts.shrinkDag(doneTxIndex, dagRemain) //踢掉依赖关系中第一层节点,那么依赖````doneTxIndex````的交易变成没有依赖的交易
                    //寻找没有依赖其他交易的交易
                    txIndexBatch := ts.popNextTxBatchFromDag(dagRemain) 
                    //ts.log.Debugf("pop next tx index batch %v", txIndexBatch)
                    for _, tx := range txIndexBatch {
                        runningTxC <- tx
                    }
                case <-finishC:
                    ts.log.Debugf("schedule with dag finish")
                    ts.scheduleFinishC <- true
                    return
                case <-timeoutC:
                    ts.log.Errorf("schedule with dag timeout")
                    ts.scheduleFinishC <- true
                    return
                }
            }
        }()
    
        //寻找没有依赖其他交易的交易
        txIndexBatch := ts.popNextTxBatchFromDag(dagRemain)
    
        go func() {
            for _, tx := range txIndexBatch {
                //没有依赖其他交易的交易可以并发生成读写集,而不会造成冲突
                runningTxC <- tx
            }
        }()
    
        <-ts.scheduleFinishC //等待检查完毕
        snapshot.Seal()
    
        ts.log.Infof("simulate with dag end, size %d, time used %+v", len(block.Txs), time.Since(startTime))
    
        // Return the read and write set after the scheduled execution
    
        for _, txRWSet := range snapshot.GetTxRWSetTable() {
            if txRWSet != nil {
                txRWSetMap[txRWSet.TxId] = txRWSet
            }
        }
        return txRWSetMap, snapshot.GetTxResultMap(), nil
    }
    

    相关文章

      网友评论

          本文标题:长安链源码学习--区块验证(BlockVerify)(六)

          本文链接:https://www.haomeiwen.com/subject/iuoaeltx.html