作者:明神特烦恼
公众号:明神特烦恼
当节点接收到其他节点发送的提案信息 或者 接收到其他节点同步过来的区块,需要对区块进行验证,确保区块的有效性。
待验证区块的数据结构:
type Block struct {
// header of the block
Header *BlockHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
// execution sequence of intra block transactions is generated by proposer
Dag *DAG `protobuf:"bytes,2,opt,name=dag,proto3" json:"dag,omitempty"`
Txs []*Transaction `protobuf:"bytes,3,rep,name=txs,proto3" json:"txs,omitempty"`
// stores the voting information of the current block
// not included in block hash value calculation
AdditionalData *AdditionalData `protobuf:"bytes,4,opt,name=additional_data,json=additionalData,proto3" json:"additional_data,omitempty"`
}
区块验证内容如下:
1)区块中交易总数 不能超过 配置的区块最大交易数限制
2)区块体的交易总数与Header.TxCount
相同
3)本节点的区块高度 >= 验证区块的高度,且能否与前一区块的Hash一致
4)提案人身份认证有效、权限检测通过(权限管理在后面章节单独介绍)
5)区块内部txid无重复
6)通过DAG生成读写集,每笔交易的读写集Hash与tx.Result.RwSetHash
一致
7)每笔交易执行结果与区块内tx.Result
一致
其中第6步与第7步都为多线程并发验证,这里体现出DAG的作用。
type DAG struct {
// sequence number of transaction topological sort
//the sequence number of the transaction topological sort associated with the transaction
Vertexes []*DAG_Neighbor `protobuf:"bytes,2,rep,name=vertexes,proto3" json:"vertexes,omitempty"`
}
之前的章节,我们分析过DAG的数据结构,Vertexes
的每个索引表示具体的交易编号。这里处理的思路是,如果某个节点与其他节点不是邻居,代表没有关联交易,可以并发生成读写集。这样可以加快读写集的生成。具体逻辑在SimulateWithDag
函数中。
func (ts *TxSchedulerImpl) SimulateWithDag(block *commonpb.Block, snapshot protocol.Snapshot) (map[string]*commonpb.TxRWSet, map[string]*commonpb.Result, error) {
txMapping := make(map[int]*commonpb.Transaction)
for index, tx := range block.Txs {
txMapping[index] = tx
}
// 交易间关联关系转化,记录每笔交易与哪些交易有关联关系,用true表示
dag := block.Dag
dagRemain := make(map[int]dagNeighbors)
for txIndex, neighbors := range dag.Vertexes {
dn := make(dagNeighbors)
for _, neighbor := range neighbors.Neighbors {
dn[int(neighbor)] = true
}
dagRemain[txIndex] = dn
}
txBatchSize := len(block.Dag.Vertexes)
runningTxC := make(chan int, txBatchSize)
doneTxC := make(chan int, txBatchSize)
timeoutC := time.After(ScheduleWithDagTimeout * time.Second)
finishC := make(chan bool)
var goRoutinePool *ants.Pool
var err error
poolCapacity := runtime.NumCPU() * 4
if goRoutinePool, err = ants.NewPool(poolCapacity, ants.WithPreAlloc(true)); err != nil {
return nil, nil, err
}
defer goRoutinePool.Release()
// 启动校验线程池,准备并发生成读写集
go func() {
for {
select {
case txIndex := <-runningTxC:
tx := txMapping[txIndex]
err := goRoutinePool.Submit(func() {
ts.log.Debugf("run vm with dag for tx id %s", tx.Header.GetTxId())
txSimContext := newTxSimContext(ts.VmManager, snapshot, tx)
runVmSuccess := true
var txResult *commonpb.Result
var err error
//调用合约模拟交易执行
if txResult, err = ts.runVM(tx, txSimContext); err != nil {
runVmSuccess = false
txSimContext.SetTxResult(txResult)
ts.log.Errorf("failed to run vm for tx id:%s during simulate with dag, tx result:%+v, error:%+v", tx.Header.GetTxId(), txResult, err)
} else {
//ts.log.Debugf("success to run vm for tx id:%s during simulate with dag, tx result:%+v", tx.Header.GetTxId(), txResult)
txSimContext.SetTxResult(txResult)
}
applyResult, applySize := snapshot.ApplyTxSimContext(txSimContext, runVmSuccess)
if !applyResult {
ts.log.Debugf("failed to apply according to dag with tx %s ", tx.Header.TxId)
runningTxC <- txIndex
} else {
ts.log.Debugf("apply to snapshot tx id:%s, result:%+v, apply count:%d", tx.Header.GetTxId(), txResult, applySize)
doneTxC <- txIndex //交易执行成功
}
// If all transactions in current batch have been successfully added to dag
if applySize >= txBatchSize {
finishC <- true
}
})
if err != nil {
ts.log.Warnf("failed to submit tx id %s during simulate with dag, %+v", tx.Header.GetTxId(), err)
}
case doneTxIndex := <-doneTxC:
ts.shrinkDag(doneTxIndex, dagRemain) //踢掉依赖关系中第一层节点,那么依赖````doneTxIndex````的交易变成没有依赖的交易
//寻找没有依赖其他交易的交易
txIndexBatch := ts.popNextTxBatchFromDag(dagRemain)
//ts.log.Debugf("pop next tx index batch %v", txIndexBatch)
for _, tx := range txIndexBatch {
runningTxC <- tx
}
case <-finishC:
ts.log.Debugf("schedule with dag finish")
ts.scheduleFinishC <- true
return
case <-timeoutC:
ts.log.Errorf("schedule with dag timeout")
ts.scheduleFinishC <- true
return
}
}
}()
//寻找没有依赖其他交易的交易
txIndexBatch := ts.popNextTxBatchFromDag(dagRemain)
go func() {
for _, tx := range txIndexBatch {
//没有依赖其他交易的交易可以并发生成读写集,而不会造成冲突
runningTxC <- tx
}
}()
<-ts.scheduleFinishC //等待检查完毕
snapshot.Seal()
ts.log.Infof("simulate with dag end, size %d, time used %+v", len(block.Txs), time.Since(startTime))
// Return the read and write set after the scheduled execution
for _, txRWSet := range snapshot.GetTxRWSetTable() {
if txRWSet != nil {
txRWSetMap[txRWSet.TxId] = txRWSet
}
}
return txRWSetMap, snapshot.GetTxResultMap(), nil
}
网友评论