美文网首页
长安链源码学习--区块验证(BlockVerify)(六)

长安链源码学习--区块验证(BlockVerify)(六)

作者: 明神特烦恼 | 来源:发表于2021-06-15 13:29 被阅读0次

作者:明神特烦恼
公众号:明神特烦恼

当节点接收到其他节点发送的提案信息 或者 接收到其他节点同步过来的区块,需要对区块进行验证,确保区块的有效性。

待验证区块的数据结构:

type Block struct {
    // header of the block
    Header *BlockHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
    // execution sequence of intra block transactions is generated by proposer
    Dag *DAG           `protobuf:"bytes,2,opt,name=dag,proto3" json:"dag,omitempty"`
    Txs []*Transaction `protobuf:"bytes,3,rep,name=txs,proto3" json:"txs,omitempty"`
    // stores the voting information of the current block
    // not included in block hash value calculation
    AdditionalData *AdditionalData `protobuf:"bytes,4,opt,name=additional_data,json=additionalData,proto3" json:"additional_data,omitempty"`
}

区块验证内容如下:
1)区块中交易总数 不能超过 配置的区块最大交易数限制
2)区块体的交易总数与Header.TxCount相同
3)本节点的区块高度 >= 验证区块的高度,且能否与前一区块的Hash一致
4)提案人身份认证有效、权限检测通过(权限管理在后面章节单独介绍)
5)区块内部txid无重复
6)通过DAG生成读写集,每笔交易的读写集Hash与tx.Result.RwSetHash一致
7)每笔交易执行结果与区块内tx.Result一致

其中第6步与第7步都为多线程并发验证,这里体现出DAG的作用。

type DAG struct {
    // sequence number of transaction topological sort
    //the sequence number of the transaction topological sort associated with the transaction
    Vertexes []*DAG_Neighbor `protobuf:"bytes,2,rep,name=vertexes,proto3" json:"vertexes,omitempty"`
}

之前的章节,我们分析过DAG的数据结构,Vertexes的每个索引表示具体的交易编号。这里处理的思路是,如果某个节点与其他节点不是邻居,代表没有关联交易,可以并发生成读写集。这样可以加快读写集的生成。具体逻辑在SimulateWithDag函数中。

func (ts *TxSchedulerImpl) SimulateWithDag(block *commonpb.Block, snapshot protocol.Snapshot) (map[string]*commonpb.TxRWSet, map[string]*commonpb.Result, error) {
    txMapping := make(map[int]*commonpb.Transaction)
    for index, tx := range block.Txs {
        txMapping[index] = tx
    }

    // 交易间关联关系转化,记录每笔交易与哪些交易有关联关系,用true表示
    dag := block.Dag
    dagRemain := make(map[int]dagNeighbors)
    for txIndex, neighbors := range dag.Vertexes {
        dn := make(dagNeighbors)
        for _, neighbor := range neighbors.Neighbors {
            dn[int(neighbor)] = true
        }
        dagRemain[txIndex] = dn
    }


    txBatchSize := len(block.Dag.Vertexes)
    runningTxC := make(chan int, txBatchSize)
    doneTxC := make(chan int, txBatchSize)

    timeoutC := time.After(ScheduleWithDagTimeout * time.Second)
    finishC := make(chan bool)
    var goRoutinePool *ants.Pool
    var err error
    poolCapacity := runtime.NumCPU() * 4
    if goRoutinePool, err = ants.NewPool(poolCapacity, ants.WithPreAlloc(true)); err != nil {
        return nil, nil, err
    }
    defer goRoutinePool.Release()
 // 启动校验线程池,准备并发生成读写集
    go func() {
        for {
            select {
            case txIndex := <-runningTxC:
                tx := txMapping[txIndex]
                err := goRoutinePool.Submit(func() {
                    ts.log.Debugf("run vm with dag for tx id %s", tx.Header.GetTxId())
                    txSimContext := newTxSimContext(ts.VmManager, snapshot, tx)
                    runVmSuccess := true
                    var txResult *commonpb.Result
                    var err error
                    //调用合约模拟交易执行
                    if txResult, err = ts.runVM(tx, txSimContext); err != nil {
                        runVmSuccess = false
                        txSimContext.SetTxResult(txResult)
                        ts.log.Errorf("failed to run vm for tx id:%s during simulate with dag, tx result:%+v, error:%+v", tx.Header.GetTxId(), txResult, err)
                    } else {
                        //ts.log.Debugf("success to run vm for tx id:%s during simulate with dag, tx result:%+v", tx.Header.GetTxId(), txResult)
                        txSimContext.SetTxResult(txResult)
                    }

                    applyResult, applySize := snapshot.ApplyTxSimContext(txSimContext, runVmSuccess)
                    if !applyResult {
                        ts.log.Debugf("failed to apply according to dag with tx %s ", tx.Header.TxId)
                        runningTxC <- txIndex
                    } else {
                        ts.log.Debugf("apply to snapshot tx id:%s, result:%+v, apply count:%d", tx.Header.GetTxId(), txResult, applySize)
                        doneTxC <- txIndex //交易执行成功
                    }
                    // If all transactions in current batch have been successfully added to dag
                    if applySize >= txBatchSize {
                        finishC <- true
                    }
                })
                if err != nil {
                    ts.log.Warnf("failed to submit tx id %s during simulate with dag, %+v", tx.Header.GetTxId(), err)
                }
            case doneTxIndex := <-doneTxC:
                ts.shrinkDag(doneTxIndex, dagRemain) //踢掉依赖关系中第一层节点,那么依赖````doneTxIndex````的交易变成没有依赖的交易
                //寻找没有依赖其他交易的交易
                txIndexBatch := ts.popNextTxBatchFromDag(dagRemain) 
                //ts.log.Debugf("pop next tx index batch %v", txIndexBatch)
                for _, tx := range txIndexBatch {
                    runningTxC <- tx
                }
            case <-finishC:
                ts.log.Debugf("schedule with dag finish")
                ts.scheduleFinishC <- true
                return
            case <-timeoutC:
                ts.log.Errorf("schedule with dag timeout")
                ts.scheduleFinishC <- true
                return
            }
        }
    }()

    //寻找没有依赖其他交易的交易
    txIndexBatch := ts.popNextTxBatchFromDag(dagRemain)

    go func() {
        for _, tx := range txIndexBatch {
            //没有依赖其他交易的交易可以并发生成读写集,而不会造成冲突
            runningTxC <- tx
        }
    }()

    <-ts.scheduleFinishC //等待检查完毕
    snapshot.Seal()

    ts.log.Infof("simulate with dag end, size %d, time used %+v", len(block.Txs), time.Since(startTime))

    // Return the read and write set after the scheduled execution

    for _, txRWSet := range snapshot.GetTxRWSetTable() {
        if txRWSet != nil {
            txRWSetMap[txRWSet.TxId] = txRWSet
        }
    }
    return txRWSetMap, snapshot.GetTxResultMap(), nil
}

相关文章

网友评论

      本文标题:长安链源码学习--区块验证(BlockVerify)(六)

      本文链接:https://www.haomeiwen.com/subject/iuoaeltx.html