美文网首页
ChainMaker 数据存储

ChainMaker 数据存储

作者: 冰冰大象 | 来源:发表于2021-08-19 13:32 被阅读0次

    带着问题看源码
    一:区块最终会存在在Block DB、State DB、History DB、Result DB 四个数据库中,怎么保证数据一致性
    二:群里有人问怎么查历史数据

    ChainMaker 采用类似数据库的预写式日志(Write-Ahead Logging (WAL))

    引用官方文档:如果区块正在提交过程中,节点因异常退出,节点在下次启动时存储模块会进入恢复流程:
    分别从Block binary log、Block DB、State DB、History DB、Result DB中获取最新的区块高度,以Block binary log中的区块高度作为基准高度,判断其他DB是否落后基准高度。
    如果有某个DB落后基准高度,则从Block bianry log中获取缺失的区块及读写集,依次提交到落后DB中。
    所有DB同步到基准高度后,存储模块启动完成,节点进入正常流程。

    先看下当共识完成后提交区块的代码

    #######位于module/core/common/block_helper.go
    func (chain *BlockCommitterImpl) AddBlock(block *commonpb.Block) (err error) {
        defer func() {
          ...
    //出错后,回滚
            if sqlErr := chain.storeHelper.RollBack(block, chain.blockchainStore); sqlErr != nil {
                chain.log.Errorf("block [%d] rollback sql failed: %s", block.Header.BlockHeight, sqlErr)
            }
        }()
        height := block.Header.BlockHeight
        lastProposed, rwSetMap, conEventMap := chain.proposalCache.GetProposedBlock(block)
        ...
    // 节点提交
        dbLasts, snapshotLasts, confLasts, otherLasts, pubEvent, blockInfo, err := chain.commonCommit.CommitBlock(
            lastProposed, rwSetMap, conEventMap)
        if err != nil {
            chain.log.Errorf("block common commit failed: %s, blockHeight: (%d)",
                err.Error(), lastProposed.Header.BlockHeight)
        }
        txRetry := chain.syncWithTxPool(lastProposed, height)
         // 清空交易队列
        chain.txPool.RetryAndRemoveTxs(txRetry, lastProposed.Txs)
         // 清空新高度的提案区块缓存
        chain.proposalCache.ClearProposedBlockAt(height)
        // synchronize new block height to consensus and sync module
        chain.msgBus.PublishSafe(msgbus.BlockInfo, blockInfo)
    ...
        return nil
    }
    
    #### module/core/common/committer.go
    func (cb *CommitBlock) CommitBlock(
        ...
          // 落块开始
        if err = cb.store.PutBlock(block, rwSet); err != nil {
            // if put db error, then panic
            cb.log.Error(err)
            panic(err)
        }
       ...
    }
    

    开始落块

    #### module/store/blockstore_impl.go
    // PutBlock commits the block and the corresponding rwsets in an atomic operation
    func (bs *BlockStoreImpl) PutBlock(block *commonPb.Block, txRWSets []*commonPb.TxRWSet) error {
        startPutBlock := utils.CurrentTimeMillisSeconds()
        //1. commit log
        blockWithRWSet := &storePb.BlockWithRWSet{
            Block:    block,
            TxRWSets: txRWSets,
        }
    // 序列化 读写集
        blockBytes, blockWithSerializedInfo, err := serialization.SerializeBlock(blockWithRWSet)
        if err != nil {
        ...
        }
        elapsedMarshalBlockAndRWSet := utils.CurrentTimeMillisSeconds() - startPutBlock
        startCommitLogDB := utils.CurrentTimeMillisSeconds()
        //这里开始写binlog了
        err = bs.writeLog(uint64(block.Header.BlockHeight), blockBytes)
        elapsedCommitlogDB := utils.CurrentTimeMillisSeconds() - startCommitLogDB
        if err != nil {
        ...
        }
    
        //commit db concurrently
        startCommitBlock := utils.CurrentTimeMillisSeconds()
        //the amount of commit db work
        numBatches := 5
        var batchWG sync.WaitGroup
        batchWG.Add(numBatches)
        errsChan := make(chan error, numBatches)
        // 开始存区块 
        // 2.commit blockDB
        go func() {
            defer batchWG.Done()
            bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.blockDB.CommitBlock)
        }()
    
        // 3.commit stateDB
        go func() {
            defer batchWG.Done()
            bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.stateDB.CommitBlock)
        }()
    // 这个历史数据库 一会去看下怎么存储的
        // 4.commit historyDB
        if !bs.storeConfig.DisableHistoryDB {
            go func() {
                defer batchWG.Done()
                bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.historyDB.CommitBlock)
            }()
        } else {
            batchWG.Done()
        }
        //5. result db
        if !bs.storeConfig.DisableResultDB {
            go func() {
                defer batchWG.Done()
                bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.resultDB.CommitBlock)
            }()
        } else {
            batchWG.Done()
        }
        //6.commit contractEventDB
        if !bs.storeConfig.DisableContractEventDB {
            go func() {
                defer batchWG.Done()
                bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.contractEventDB.CommitBlock)
            }()
        } else {
            batchWG.Done()
        }
    
        batchWG.Wait()
        if len(errsChan) > 0 {
            return <-errsChan
        }
        elapsedCommitBlock := utils.CurrentTimeMillisSeconds() - startCommitBlock
    
        //7. clean wal, delete block and rwset after commit
        go func() {
            err := bs.deleteBlockFromLog(uint64(block.Header.BlockHeight))
            if err != nil {
                bs.logger.Warnf("chain[%s]: failed to clean log, block[%d], err:%s",
                    block.Header.ChainId, block.Header.BlockHeight, err)
            }
        }()
        ...
        return nil
    }
    

    bs.writeLog(uint64(block.Header.BlockHeight), blockBytes) 写日志文件 看看都有些啥

    #### module/store/blockstore_impl.go
    func (bs *BlockStoreImpl) writeLog(blockHeight uint64, bytes []byte) error {
        // wal log, index increase from 1, while blockHeight increase form 0
        return bs.wal.Write(blockHeight+1, bytes)
    }
    

    看一下wal 是什么

    // BlockStoreImpl provides an implementation of `protocol.BlockchainStore`.
    type BlockStoreImpl struct {
        ...
        wal             binlog.BinLoger
        ...
    }
    ##### 看一下BinLoger 一个接口
    type BinLoger interface {
        // 关闭 
        Close() error
       // 清空index之前的数据 
        TruncateFront(index uint64) error
       // 读数据
        Read(index uint64) (data []byte, err error)
       // 获取最后一个标识
        LastIndex() (index uint64, err error)
      // 写数据
        Write(index uint64, data []byte) error
    }
    ##### 继续看谁实现了这个接口
    // NewBlockStoreImpl constructs new `BlockStoreImpl`
    func NewBlockStoreImpl(...) (*BlockStoreImpl, error) {
        walPath := filepath.Join(storeConfig.StorePath, chainId, logPath)
        writeAsync := storeConfig.LogDBWriteAsync
        walOpt := &wal.Options{
            NoSync: writeAsync,
        }
        if binLog == nil {
            writeLog, err := wal.Open(walPath, walOpt)
            if err != nil {
                panic(fmt.Sprintf("open wal failed, path:%s, error:%s", walPath, err))
            }
            binLog = writeLog
        }
          ...
        blockStore := &BlockStoreImpl{
            ...
            wal:              binLog,
            ...
        }
        //binlog 有SavePoint,不是空数据库,进行数据恢复
        if i, errbs := blockStore.getLastSavepoint(); errbs == nil && i > 0 {
            //check savepoint and recover 开始同步数据
            errbs = blockStore.recover()
            if errbs != nil {
                return nil, errbs
            }
        } else {
            logger.Info("binlog is empty, don't need recover")
        }
        return blockStore, nil
    }
    
    

    这里看到 是 wal.Open产生的binLog对象 实现BinLoger 接口,然后发现wal功能其实是使用了https://github.com/tidwall/wal 在看一下 同步方法blockStore.recover()

    // recover checks savepoint and recommit lost block
    func (bs *BlockStoreImpl) recover() error {
       ...
       // 读出日志最后高度
        if logSavepoint, err = bs.getLastSavepoint(); err != nil {
            return err
        }
      // 以下是分别读出各数据库的高度
        if blockSavepoint, err = bs.blockDB.GetLastSavepoint(); err != nil {
            return err
        }
        ...
        //recommit blockdb 将数据库中的高度与日志高度进行比较,并将缺失的进行补齐 以下都是一样的,
        if err := bs.recoverBlockDB(blockSavepoint, logSavepoint); err != nil {
            return err
        }
        ...
    }
    

    我们以一个同步数据方法recoverBlockDB为例,其他数据库逻辑是一样的

    func (bs *BlockStoreImpl) recoverBlockDB(currentHeight uint64, savePoint uint64) error {
    // 计算出缺失的高度
        height := bs.calculateRecoverHeight(currentHeight, savePoint)
        for ; height <= savePoint; height++ {
            bs.logger.Infof("[BlockDB] recommitting lost blocks, blockNum=%d, lastBlockNum=%d", height, savePoint)
        //  从日志里捞数据
            blockWithSerializedInfo, err := bs.getBlockFromLog(height)
            if err != nil {
                return err
            }
     //  补齐到缺失的数据库中
            err = bs.blockDB.CommitBlock(blockWithSerializedInfo)
            if err != nil {
                return err
            }
        }
        return nil
    }
    

    此时第一个问题 可以回答
    一:区块最终会存在在Block DB、State DB、History DB、Result DB 四个数据库中,怎么保证数据一致性
    答:采用wal 机制 现将数据落到日志,在通过日志同一落到各数据库中,当出现异常退出时,重启后 各数据库会向日志库同步数据 保证各数据库数据一致

    如何查看历史数据,在historyDB里发现一个接口

    // HistoryDB provides handle to rwSets instances
    type HistoryDB interface {
        InitGenesis(genesisBlock *serialization.BlockWithSerializedInfo) error
        // CommitBlock commits the block rwsets in an atomic operation
        CommitBlock(blockInfo *serialization.BlockWithSerializedInfo) error
    
        //GetHistoryForKey 获得Key的交易历史
        GetHistoryForKey(contractName string, key []byte) (HistoryIterator, error)
        GetAccountTxHistory(account []byte) (HistoryIterator, error)
        GetContractTxHistory(contractName string) (HistoryIterator, error)
        // GetLastSavepoint returns the last block height
        GetLastSavepoint() (uint64, error)
    
        // Close is used to close database
        Close()
    }
    

    但是在go sdk 中并没有发现相对应的方法,所以目前查询历史交易的方法并没有暴露出来。
    于是自己写一个访问历史数据库代码 引入chainmaker-go

    package main
    
    import (
        "chainmaker.org/chainmaker-go/localconf"
        "chainmaker.org/chainmaker-go/protocol/test"
        "chainmaker.org/chainmaker-go/store"
        "fmt"
        "os"
        "path/filepath"
        "time"
    )
    func getlvldbConfig(path string) *localconf.StorageConfig {
        conf := &localconf.StorageConfig{}
        if path == "" {
            path = filepath.Join(os.TempDir(), fmt.Sprintf("%d", time.Now().Nanosecond()))
        }
        conf.StorePath = path
    
        lvlConfig := &localconf.LevelDbConfig{
            StorePath: path,
        }
        dbConfig := &localconf.DbConfig{
            Provider:      "leveldb",
            LevelDbConfig: lvlConfig,
        }
        conf.BlockDbConfig = dbConfig
        conf.StateDbConfig = dbConfig
        conf.HistoryDbConfig = dbConfig
        conf.ResultDbConfig = dbConfig
        conf.DisableContractEventDB = true
        return conf
    }
    var log = &test.GoLogger{}
    func main()  {
        var factory store.Factory
        s, err := factory.NewStore("c1628498288425", getlvldbConfig("/Users/sunbo/Desktop/ChainMaker/src/baas-api/nodeconfigs/c1628498288425-wx-org1-chainmaker-org-node1/data/history"),  log)
        if err != nil {
            panic(err)
        }
        //"fact_json","name" 注意这里的Key是Key和前缀 采用#号连接  
        datas,err:= s.GetHistoryForKey("test",[]byte("factjson#name"))
        if err!=nil{
            panic(err)
        }
        for datas.Next(){
           keys,err:=datas.Value()
            if err!=nil{
                panic(err)
            }
            fmt.Println(keys)
        }
        fmt.Println(s)
    }
    

    二:群里有人问怎么查历史数据
    答:ChainMaker 本身支持一个HistoryDB 数据库来存储历史数据,但是目前版本不支持,后续版本官方会支持,不过 目前自己先做个版本支持一下

    添加历史查询功能 https://www.jianshu.com/p/1a37fa9cbcd2

    相关文章

      网友评论

          本文标题:ChainMaker 数据存储

          本文链接:https://www.haomeiwen.com/subject/ifddbltx.html