带着问题看源码
一:区块最终会存在在Block DB、State DB、History DB、Result DB 四个数据库中,怎么保证数据一致性
二:群里有人问怎么查历史数据
ChainMaker 采用类似数据库的预写式日志(Write-Ahead Logging (WAL))
引用官方文档:如果区块正在提交过程中,节点因异常退出,节点在下次启动时存储模块会进入恢复流程:
分别从Block binary log、Block DB、State DB、History DB、Result DB中获取最新的区块高度,以Block binary log中的区块高度作为基准高度,判断其他DB是否落后基准高度。
如果有某个DB落后基准高度,则从Block bianry log中获取缺失的区块及读写集,依次提交到落后DB中。
所有DB同步到基准高度后,存储模块启动完成,节点进入正常流程。
先看下当共识完成后提交区块的代码
#######位于module/core/common/block_helper.go
func (chain *BlockCommitterImpl) AddBlock(block *commonpb.Block) (err error) {
defer func() {
...
//出错后,回滚
if sqlErr := chain.storeHelper.RollBack(block, chain.blockchainStore); sqlErr != nil {
chain.log.Errorf("block [%d] rollback sql failed: %s", block.Header.BlockHeight, sqlErr)
}
}()
height := block.Header.BlockHeight
lastProposed, rwSetMap, conEventMap := chain.proposalCache.GetProposedBlock(block)
...
// 节点提交
dbLasts, snapshotLasts, confLasts, otherLasts, pubEvent, blockInfo, err := chain.commonCommit.CommitBlock(
lastProposed, rwSetMap, conEventMap)
if err != nil {
chain.log.Errorf("block common commit failed: %s, blockHeight: (%d)",
err.Error(), lastProposed.Header.BlockHeight)
}
txRetry := chain.syncWithTxPool(lastProposed, height)
// 清空交易队列
chain.txPool.RetryAndRemoveTxs(txRetry, lastProposed.Txs)
// 清空新高度的提案区块缓存
chain.proposalCache.ClearProposedBlockAt(height)
// synchronize new block height to consensus and sync module
chain.msgBus.PublishSafe(msgbus.BlockInfo, blockInfo)
...
return nil
}
#### module/core/common/committer.go
func (cb *CommitBlock) CommitBlock(
...
// 落块开始
if err = cb.store.PutBlock(block, rwSet); err != nil {
// if put db error, then panic
cb.log.Error(err)
panic(err)
}
...
}
开始落块
#### module/store/blockstore_impl.go
// PutBlock commits the block and the corresponding rwsets in an atomic operation
func (bs *BlockStoreImpl) PutBlock(block *commonPb.Block, txRWSets []*commonPb.TxRWSet) error {
startPutBlock := utils.CurrentTimeMillisSeconds()
//1. commit log
blockWithRWSet := &storePb.BlockWithRWSet{
Block: block,
TxRWSets: txRWSets,
}
// 序列化 读写集
blockBytes, blockWithSerializedInfo, err := serialization.SerializeBlock(blockWithRWSet)
if err != nil {
...
}
elapsedMarshalBlockAndRWSet := utils.CurrentTimeMillisSeconds() - startPutBlock
startCommitLogDB := utils.CurrentTimeMillisSeconds()
//这里开始写binlog了
err = bs.writeLog(uint64(block.Header.BlockHeight), blockBytes)
elapsedCommitlogDB := utils.CurrentTimeMillisSeconds() - startCommitLogDB
if err != nil {
...
}
//commit db concurrently
startCommitBlock := utils.CurrentTimeMillisSeconds()
//the amount of commit db work
numBatches := 5
var batchWG sync.WaitGroup
batchWG.Add(numBatches)
errsChan := make(chan error, numBatches)
// 开始存区块
// 2.commit blockDB
go func() {
defer batchWG.Done()
bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.blockDB.CommitBlock)
}()
// 3.commit stateDB
go func() {
defer batchWG.Done()
bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.stateDB.CommitBlock)
}()
// 这个历史数据库 一会去看下怎么存储的
// 4.commit historyDB
if !bs.storeConfig.DisableHistoryDB {
go func() {
defer batchWG.Done()
bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.historyDB.CommitBlock)
}()
} else {
batchWG.Done()
}
//5. result db
if !bs.storeConfig.DisableResultDB {
go func() {
defer batchWG.Done()
bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.resultDB.CommitBlock)
}()
} else {
batchWG.Done()
}
//6.commit contractEventDB
if !bs.storeConfig.DisableContractEventDB {
go func() {
defer batchWG.Done()
bs.putBlock2DB(blockWithSerializedInfo, errsChan, bs.contractEventDB.CommitBlock)
}()
} else {
batchWG.Done()
}
batchWG.Wait()
if len(errsChan) > 0 {
return <-errsChan
}
elapsedCommitBlock := utils.CurrentTimeMillisSeconds() - startCommitBlock
//7. clean wal, delete block and rwset after commit
go func() {
err := bs.deleteBlockFromLog(uint64(block.Header.BlockHeight))
if err != nil {
bs.logger.Warnf("chain[%s]: failed to clean log, block[%d], err:%s",
block.Header.ChainId, block.Header.BlockHeight, err)
}
}()
...
return nil
}
bs.writeLog(uint64(block.Header.BlockHeight), blockBytes)
写日志文件 看看都有些啥
#### module/store/blockstore_impl.go
func (bs *BlockStoreImpl) writeLog(blockHeight uint64, bytes []byte) error {
// wal log, index increase from 1, while blockHeight increase form 0
return bs.wal.Write(blockHeight+1, bytes)
}
看一下wal 是什么
// BlockStoreImpl provides an implementation of `protocol.BlockchainStore`.
type BlockStoreImpl struct {
...
wal binlog.BinLoger
...
}
##### 看一下BinLoger 一个接口
type BinLoger interface {
// 关闭
Close() error
// 清空index之前的数据
TruncateFront(index uint64) error
// 读数据
Read(index uint64) (data []byte, err error)
// 获取最后一个标识
LastIndex() (index uint64, err error)
// 写数据
Write(index uint64, data []byte) error
}
##### 继续看谁实现了这个接口
// NewBlockStoreImpl constructs new `BlockStoreImpl`
func NewBlockStoreImpl(...) (*BlockStoreImpl, error) {
walPath := filepath.Join(storeConfig.StorePath, chainId, logPath)
writeAsync := storeConfig.LogDBWriteAsync
walOpt := &wal.Options{
NoSync: writeAsync,
}
if binLog == nil {
writeLog, err := wal.Open(walPath, walOpt)
if err != nil {
panic(fmt.Sprintf("open wal failed, path:%s, error:%s", walPath, err))
}
binLog = writeLog
}
...
blockStore := &BlockStoreImpl{
...
wal: binLog,
...
}
//binlog 有SavePoint,不是空数据库,进行数据恢复
if i, errbs := blockStore.getLastSavepoint(); errbs == nil && i > 0 {
//check savepoint and recover 开始同步数据
errbs = blockStore.recover()
if errbs != nil {
return nil, errbs
}
} else {
logger.Info("binlog is empty, don't need recover")
}
return blockStore, nil
}
这里看到 是 wal.Open
产生的binLog对象 实现BinLoger 接口,然后发现wal功能其实是使用了https://github.com/tidwall/wal 在看一下 同步方法blockStore.recover()
// recover checks savepoint and recommit lost block
func (bs *BlockStoreImpl) recover() error {
...
// 读出日志最后高度
if logSavepoint, err = bs.getLastSavepoint(); err != nil {
return err
}
// 以下是分别读出各数据库的高度
if blockSavepoint, err = bs.blockDB.GetLastSavepoint(); err != nil {
return err
}
...
//recommit blockdb 将数据库中的高度与日志高度进行比较,并将缺失的进行补齐 以下都是一样的,
if err := bs.recoverBlockDB(blockSavepoint, logSavepoint); err != nil {
return err
}
...
}
我们以一个同步数据方法recoverBlockDB
为例,其他数据库逻辑是一样的
func (bs *BlockStoreImpl) recoverBlockDB(currentHeight uint64, savePoint uint64) error {
// 计算出缺失的高度
height := bs.calculateRecoverHeight(currentHeight, savePoint)
for ; height <= savePoint; height++ {
bs.logger.Infof("[BlockDB] recommitting lost blocks, blockNum=%d, lastBlockNum=%d", height, savePoint)
// 从日志里捞数据
blockWithSerializedInfo, err := bs.getBlockFromLog(height)
if err != nil {
return err
}
// 补齐到缺失的数据库中
err = bs.blockDB.CommitBlock(blockWithSerializedInfo)
if err != nil {
return err
}
}
return nil
}
此时第一个问题 可以回答
一:区块最终会存在在Block DB、State DB、History DB、Result DB 四个数据库中,怎么保证数据一致性
答:采用wal 机制 现将数据落到日志,在通过日志同一落到各数据库中,当出现异常退出时,重启后 各数据库会向日志库同步数据 保证各数据库数据一致
如何查看历史数据,在historyDB里发现一个接口
// HistoryDB provides handle to rwSets instances
type HistoryDB interface {
InitGenesis(genesisBlock *serialization.BlockWithSerializedInfo) error
// CommitBlock commits the block rwsets in an atomic operation
CommitBlock(blockInfo *serialization.BlockWithSerializedInfo) error
//GetHistoryForKey 获得Key的交易历史
GetHistoryForKey(contractName string, key []byte) (HistoryIterator, error)
GetAccountTxHistory(account []byte) (HistoryIterator, error)
GetContractTxHistory(contractName string) (HistoryIterator, error)
// GetLastSavepoint returns the last block height
GetLastSavepoint() (uint64, error)
// Close is used to close database
Close()
}
但是在go sdk 中并没有发现相对应的方法,所以目前查询历史交易的方法并没有暴露出来。
于是自己写一个访问历史数据库代码 引入chainmaker-go
package main
import (
"chainmaker.org/chainmaker-go/localconf"
"chainmaker.org/chainmaker-go/protocol/test"
"chainmaker.org/chainmaker-go/store"
"fmt"
"os"
"path/filepath"
"time"
)
func getlvldbConfig(path string) *localconf.StorageConfig {
conf := &localconf.StorageConfig{}
if path == "" {
path = filepath.Join(os.TempDir(), fmt.Sprintf("%d", time.Now().Nanosecond()))
}
conf.StorePath = path
lvlConfig := &localconf.LevelDbConfig{
StorePath: path,
}
dbConfig := &localconf.DbConfig{
Provider: "leveldb",
LevelDbConfig: lvlConfig,
}
conf.BlockDbConfig = dbConfig
conf.StateDbConfig = dbConfig
conf.HistoryDbConfig = dbConfig
conf.ResultDbConfig = dbConfig
conf.DisableContractEventDB = true
return conf
}
var log = &test.GoLogger{}
func main() {
var factory store.Factory
s, err := factory.NewStore("c1628498288425", getlvldbConfig("/Users/sunbo/Desktop/ChainMaker/src/baas-api/nodeconfigs/c1628498288425-wx-org1-chainmaker-org-node1/data/history"), log)
if err != nil {
panic(err)
}
//"fact_json","name" 注意这里的Key是Key和前缀 采用#号连接
datas,err:= s.GetHistoryForKey("test",[]byte("factjson#name"))
if err!=nil{
panic(err)
}
for datas.Next(){
keys,err:=datas.Value()
if err!=nil{
panic(err)
}
fmt.Println(keys)
}
fmt.Println(s)
}
二:群里有人问怎么查历史数据
答:ChainMaker 本身支持一个HistoryDB 数据库来存储历史数据,但是目前版本不支持,后续版本官方会支持,不过 目前自己先做个版本支持一下
网友评论