美文网首页
新建文章 : 批量更新

新建文章 : 批量更新

作者: nit小星星 | 来源:发表于2019-02-24 07:11 被阅读64次

我们了解一下星云链的数据结构:

采用一种类似MPT树来贮存数据、但是和以太坊的树有细微的差别。

星云链三种节点:

第一种:Branch Node: 16-elements array, value is [hash_0, hash_1, ..., hash_f, hash]

第二种:Extension Node: 3-elements array, value is [ext flag, prefix path, next hash]

第三种: Leaf Node: 3-elements array, value is [leaf flag, suffix path, value]

从代码上看

type node struct {

Hash  []byte

Bytes []byte

Val  [][]byte

}

其实很简单Hash  []byte体现了merkel tree的特性

Val  [][]byte体现了了patricia tree的特性

我们再看树的结构

type Trie struct {

rootHash      []byte

storage      storage.Storage

changelog    []*Entry

needChangelog bool

}

存着一个根hash

这个hash有什么用呢,我们可以使用这个hash从关系型数据库(在星云链上使用的是rocksdb)拿到trie tree的一个节点node。或者从内存,或者从leveldb。星云链实现了三种接口。具体得到在哪个地方使用。

代码如下:

func (t *Trie) fetchNode(hash []byte) (*node, error) {

ir, err := t.storage.Get(hash)

if err != nil {

return nil, err

}

pb := new(triepb.Node)

if err := proto.Unmarshal(ir, pb); err != nil {

return nil, err

}

n := new(node)

if err := n.FromProto(pb); err != nil {

return nil, err

}

return n, nil

}

取数据(节点)在上边,下边一段代码是存数据:

func (t *Trie) commitNode(n *node) error {

pb, err := n.ToProto()

if err != nil {

return err

}

n.Bytes, err = proto.Marshal(pb)

if err != nil {

return err

}

n.Hash = hash.Sha3256(n.Bytes)

return t.storage.Put(n.Hash, n.Bytes)

}

关键就是把数据进行sha3256hash计算,然后使用put存进去。接口都在storage那里定义好了

当然put返回一个Nil或者err.代表成功或者失败

好,接下来我们使用这个结构查找一个数据。

先把代码贴出来吧:虽然很影响篇幅,查找自然是使用到patricia 结构啦

首先函数传入一个roothash 能够定位到节点,然后从该节点开始,使用route往下查找。

func (t *Trie) get(rootHash []byte, route []byte) ([]byte, error) {

//fmt.Println("enter get: rootHash = ", rootHash, ", route = ", route)

curRootHash := rootHash

curRoute := route

for len(curRoute) >= 0 {

rootNode, err := t.fetchNode(curRootHash)

if err != nil {

//fmt.Println("get: fetchNode error")

return nil, err

}

flag, err := rootNode.Type()

if err != nil {

//fmt.Println("get: assert Type error")

return nil, err

}

if len(curRoute) == 0 && flag != leaf {

//fmt.Println("get: too short")

return nil, errors.New("wrong key, too short")

}

switch flag {

case branch:

curRootHash = rootNode.Val[curRoute[0]]

curRoute = curRoute[1:]

//fmt.Println("in case branch: curRootHash = ", curRootHash, ", curRoute = ", curRoute)

break

case ext:

path := rootNode.Val[1]

next := rootNode.Val[2]

matchLen := prefixLen(path, curRoute)

if matchLen != len(path) {

//fmt.Println("get: not matchLen, path = ", path, ", curRoute = ", curRoute)

return nil, ErrNotFound

}

curRootHash = next

curRoute = curRoute[matchLen:]

//fmt.Println("in case ext : curRootHash = ", curRootHash, ", curRoute = ", curRoute)

break

case leaf:

path := rootNode.Val[1]

matchLen := prefixLen(path, curRoute)

if matchLen != len(path) || matchLen != len(curRoute) {

//fmt.Println("get:**************")

return nil, ErrNotFound

}

//fmt.Println("get:22222************")

return rootNode.Val[2], nil

default:

//fmt.Println("get:3333************")

return nil, errors.New("unknown node type")

}

}

return nil, ErrNotFound

}

这里稍等一下,node里面的【】byte代表什么呢。

原来是val经过proto.marshal后的数据。

可想而知在网络上传输的数据是byte[]的数据。然后到了对方的电脑上经过unmarshal一下就可以得到val了。然后再将byte[]sha3256一下就可以得到hash了。据说go proto比json和xml序列化好。是不是真的

这个函数里面使用一个for循环查找数据,地球人都知道

当然了这棵树和Merkel树是有很大差别。merkel是用两个子节点来计算父节点hash,而这棵树有自己的特征,直接是从一个子节点得到父节点hash.但是因为分支节点的存在,所以还是很方便遍历的。

接下来就是Put一个数据进到trie里面了。注意~~前面的那个put是调用内存接口,跟这个没关系。嘿嘿

继续贴代码:

func (t *Trie) Put(key []byte, val []byte) ([]byte, error) {

newHash, err := t.update(t.rootHash, keyToRoute(key), val)

if err != nil {

return nil, err

}

t.rootHash = newHash

if t.needChangelog {

entry := &Entry{Update, key, nil, val}

t.changelog = append(t.changelog, entry)

}

return newHash, nil

}

是不是很简单:继续往下贴update代码

不急不急,先搞定keyToRoute代码

func keyToRoute(key []byte) []byte {

l := len(key) * 2

var route = make([]byte, l)

for i, b := range key {

route[i*2] = b / 16

route[i*2+1] = b % 16

}

return route

}

这个看应该看得懂,就是key原本是16进制的数0~f。把每个十六进制的十六位和个位都拆开存在数组里。因为route是0~f的嘛。

可以继续分析update代码了

func (t *Trie) update(root []byte, route []byte, val []byte) ([]byte, error) {

if root == nil || len(root) == 0 {

// directly add leaf node

value := [][]byte{[]byte{byte(leaf)}, route, val}

node, err := t.createNode(value)

if err != nil {

return nil, err

}

return node.Hash, nil

}

rootNode, err := t.fetchNode(root)

if err != nil {

return nil, err

}

flag, err := rootNode.Type()

if err != nil {

return nil, err

}

switch flag {

case branch:

return t.updateWhenMeetBranch(rootNode, route, val)

case ext:

return t.updateWhenMeetExt(rootNode, route, val)

case leaf:

return t.updateWhenMeetLeaf(rootNode, route, val)

default:

return nil, errors.New("unknown node type")

}

}

简单看一下createnode

func (t *Trie) createNode(val [][]byte) (*node, error) {

n := &node{Val: val}

if err := t.commitNode(n); err != nil {

return nil, err

}

return n, nil

}

诶就是把你的数据变成一个node

接下来到了核心的:

switch flag {

case branch:

return t.updateWhenMeetBranch(rootNode, route, val)

case ext:

return t.updateWhenMeetExt(rootNode, route, val)

case leaf:

return t.updateWhenMeetLeaf(rootNode, route, val)

default:

return nil, errors.New("unknown node type")

}

这三个函数一个个分析就完了。

第一个:

func (t *Trie) updateWhenMeetBranch(rootNode *node, route []byte, val []byte) ([]byte, error) {

// update sub-trie

newHash, err := t.update(rootNode.Val[route[0]], route[1:], val)

if err != nil {

return nil, err

}

// update the branch hash

rootNode.Val[route[0]] = newHash

// save updated node to storage

t.commitNode(rootNode)

return rootNode.Hash, nil

}

当我们拿到的是Branch节点的时候

还记得branch 的route只有一个数嘛

然后拿传入的route的第一个往branch的那个hash往下找。当然这是一个递归过程

这个最简单了

接下来如果遇到ext节点:

func (t *Trie) updateWhenMeetExt(rootNode *node, route []byte, val []byte) ([]byte, error) {

var err error

path := rootNode.Val[1]

next := rootNode.Val[2]

if len(path) > len(route) {

return nil, errors.New("wrong key, too short")

}

matchLen := prefixLen(path, route)

// add new node to the ext node's sub-trie

if matchLen == len(path) {

newHash, err := t.update(next, route[matchLen:], val)

if err != nil {

return nil, err

}

// update the new hash

rootNode.Val[2] = newHash

// save updated node to storage

t.commitNode(rootNode)

return rootNode.Hash, nil

}

// create a new branch for the new node

brNode := emptyBranchNode()

// 4 situations

// 1. matchLen > 0 && matchLen == len(path), 24 meets 24... => ext - branch - ...

// 2. matchLen > 0 && matchLen + 1 < len(path), 23... meets 24... => ext - branch - ext ...

// 3. matchLen = 0 && len(path) = 1, 1 meets 2 => branch - ...

if matchLen > 0 || len(path) == 1 {

// a branch to hold the ext node's sub-trie

brNode.Val[path[matchLen]] = next

if matchLen > 0 && matchLen+1 < len(path) {

value := [][]byte{[]byte{byte(ext)}, path[matchLen+1:], next}

extNode, err := t.createNode(value)

if err != nil {

return nil, err

}

brNode.Val[path[matchLen]] = extNode.Hash

}

// a branch to hold the new node

if brNode.Val[route[matchLen]], err = t.update(nil, route[matchLen+1:], val); err != nil {

return nil, err

}

// save branch to the storage

if err := t.commitNode(brNode); err != nil {

return nil, err

}

// if no common prefix, replace the ext node with the new branch node

if matchLen == 0 {

return brNode.Hash, nil

}

// use the new branch node as the ext node's sub-trie

rootNode.Val[1] = path[0:matchLen]

rootNode.Val[2] = brNode.Hash

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

return rootNode.Hash, nil

}

// 4. matchLen = 0 && len(path) > 1, 12... meets 23... => branch - ext - ...

rootNode.Val[1] = path[1:]

// save ext node to storage

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

brNode.Val[path[matchLen]] = rootNode.Hash

// a branch to hold the new node

if brNode.Val[route[matchLen]], err = t.update(nil, route[matchLen+1:], val); err != nil {

return nil, err

}

// save branch to the storage

if err := t.commitNode(brNode); err != nil {

return nil, err

}

return brNode.Hash, nil

}

代码是不是有点长

这个节点之所以代码长的原因是有几种情况。

第一种是当我们传入的route已经不足以比配他的suffix path。这时候该怎么做呢。这是候应该返回错误。因为你的route是错的。为什么,代码上这么写的。

但是我可以猜想:所有key的长度都是一样的,所以到suffix 一半就结束的key是错的。正确的key必须得比配到leaf节点(或者自己创建的leaf节点)

但是代码中只有>没有=号,所以是不是代码写错了。如果他的长度等于suffix path也应该是错的。因为ext后边一定要有leaf或者branch节点的。但是我想代码在生成key那里也会检查吧,所以这里也不会很重要。

继续。匹配到一半,接下来就不同了。

把前面已经比配的变成ext然后创建一个Branch接下去两个分支,一个原来的,一个新的leaf。这里说得也不清楚稍微体味

第三种情况就是匹配的字符和path的长度是一样的。那么继续走下去了。

以上就是meetext的全部

接下来到meetleaf了,老规矩,贴代码

func (t *Trie) updateWhenMeetLeaf(rootNode *node, route []byte, val []byte) ([]byte, error) {

var err error

path := rootNode.Val[1]

leafVal := rootNode.Val[2]

if len(path) > len(route) {

return nil, errors.New("wrong key, too short")

}

matchLen := prefixLen(path, route)

// node exists, update its value

if matchLen == len(path) {

if len(route) > matchLen {

return nil, errors.New("wrong key, too long")

}

rootNode.Val[2] = val

// save updated node to storage

t.commitNode(rootNode)

return rootNode.Hash, nil

}

// create a new branch for the new node

brNode := emptyBranchNode()

// a branch to hold the leaf node

if brNode.Val[path[matchLen]], err = t.update(nil, path[matchLen+1:], leafVal); err != nil {

return nil, err

}

// a branch to hold the new node

if brNode.Val[route[matchLen]], err = t.update(nil, route[matchLen+1:], val); err != nil {

return nil, err

}

// save the new branch node to storage

if err := t.commitNode(brNode); err != nil {

return nil, err

}

// if no common prefix, replace the leaf node with the new branch node

if matchLen == 0 {

return brNode.Hash, nil

}

// create a new ext node, and use the new branch node as the new ext node's sub-trie

rootNode.Val[0] = []byte{byte(ext)}

rootNode.Val[1] = path[0:matchLen]

rootNode.Val[2] = brNode.Hash

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

return rootNode.Hash, nil

}

饿了,吃早餐去

假如已经到叶子节点了。如果剩下的route没有叶子节点的suffix path那么长。那么还是和之前一样返回错误。而且应该必须等于叶子节点的suffix 的长度。之前已经说过,每个Key的长度都是一样的

叶子节点需要创建一个branch和一个ext节点和一个叶子节点拉哎添加到树中。

至此整个树的插入操作已经完成

是不是发现了一个问题——————————每一个数据插入的时候都得更新 从叶子节点到根节点的hash。这和Merkel原理是一样的。

删除就不说了。我列一个图:

// Del the node's value in trie

大家自己看吧,免得影响篇幅

上面那个问题是。每次插入一个数据就得更新整个树。虽然是<lgn但是我们能不能让他先插入多个数据然后再一起更新呢

首先定义数据结构用来存node

type TNode struct {

Key []byte

Children []*TNode

Val []byte

}

每次接受到一个val就更新该结构

func UpdateTreeRecur(root *TNode, key []byte, value []byte) {

route := keyToRoute(key)

fmt.Println("route:", route)

updateTreeRecur(root, route, value)

}

func updateTreeRecur(root *TNode, route []byte, value []byte) {

if len(route) == 0 {

root.Val = value

return

}

var existed = false

var nextRoot *TNode

for _, elem := range root.Children {

if route[0] == elem.Key[0] {

existed = true

nextRoot = elem

break

}

}

if existed == false {

node := &TNode{Key: route[0:1], Children: nil, Val: nil}

root.Children = append(root.Children, node)

nextRoot = node

}

updateTreeRecur(nextRoot, route[1:], value)

}

我们的树最多有16个孩子,只有在叶子孩子才有数据。其他非叶子val的都设置为nil,而且每个非叶子节点route只有一位数

/***************************************Batch Operation**************************************************/

接下来是将这个数据结构作为个整体直接带进插入操作中。

func (t *Trie) BatchPut(tree *TNode) error {

var err error

t.rootHash, err = t.batchUpdate(t.rootHash, tree)

if err != nil {

return err

}

//fmt.Println("t.rootHash = ", t.rootHash)

return nil

}

func (t *Trie) batchUpdate(root []byte, tree *TNode) ([]byte, error) {

//fmt.Println("enter batchUpdate: root = ", root, ", tree = ", tree)

if len(tree.Children) == 0 {

return nil, errors.New("wrong happened: no records")

}

if root == nil || len(root) == 0 {

return t.updateNewTrie(tree)

        }

rootNode, err := t.fetchNode(root)

if err != nil {

return nil, err

}

flag, err := rootNode.Type()

if err != nil {

return nil, err

}

switch flag {

case ext:

return t.updateTrieWhenMeetExt(rootNode, tree)

case branch:

return t.updateTrieWhenMeetBranch(rootNode, tree)

case leaf:

return t.updateTrieWhenMeetLeaf(rootNode, tree)

}

return nil, errors.New("batch update error.")

}

形式上还是抄了单个插入的写法。

首先如果传入的roothash是nil的话,那么

func (t *Trie) updateNewTrie(root *TNode) ([]byte, error){

numOfChildren := len(root.Children)

if numOfChildren > 1 {

return t.updateBranchNewTrie(root.Children)

} else if numOfChildren == 1 {

return t.updateSingleNewTrie(root.Children[0])

} else {

return nil, errors.New("number of children error!")

}

}

接下来两个函数这样写

func (t *Trie) updateBranchNewTrie(children []*TNode) ([]byte, error) {

brNode := emptyBranchNode()

for _, elem := range children {

if len(elem.Children) > 1 {

brNode.Val[elem.Key[0]], _ = t.updateBranchNewTrie(elem.Children)

} else {

brNode.Val[elem.Key[0]], _ = t.updateSingleNewTrie(elem.Children[0])

}

}

if err := t.commitNode(brNode); err != nil {

return nil, err

}

return brNode.Hash, nil

}

这是一个递归过程:关键在于如果一个TNode有很多子节点的话,那么先把他的子节点一个个更新,才更细brNode.hash。这样其实是减少了brnode.hash的更细次数

按照原来的如果有多个数那么brnode.hash是得更新多次的。

func (t *Trie) updateSingleNewTrie(root *TNode) ([]byte, error){

numOfChildren := len(root.Children)

//fmt.Println("key = ", root.Key, ", children = ", root.Children)

if numOfChildren == 0 {

fmt.Println("has arrived leaf: root.Key = ", root.Key)

value := [][]byte{[]byte{byte(leaf)}, root.Key, root.Val}

                newNode, err := t.createNode(value)

                if err != nil {

                        return nil, err

                }

                return newNode.Hash, nil

} else if numOfChildren == 1 {

root.Key = append(root.Key, root.Children[0].Key[0])

root.Val = root.Children[0].Val

root.Children = root.Children[0].Children

return t.updateSingleNewTrie(root)

} else if numOfChildren > 1 {

newHash, err := t.updateBranchNewTrie(root.Children)

if err != nil {

return nil, err

}

value := [][]byte{[]byte{byte(ext)}, root.Key, newHash}

newNode, err := t.createNode(value)

if err != nil {

return nil, err

}

return newNode.Hash, nil

} else {

return nil, errors.New("error happend!")

}

}

上边的代码就有到了叶子节点的情况了,也就是递归的尽头。

想想就知道了。updatebranchtrie是不可能到叶子的。因为他还有孩子节点。

========================================================================

以上是更新一个新的树,如果更新一个已经有节点的树呢,返回到

switch flag {

case ext:

return t.updateTrieWhenMeetExt(rootNode, tree)

case branch:

return t.updateTrieWhenMeetBranch(rootNode, tree)

case leaf:

return t.updateTrieWhenMeetLeaf(rootNode, tree)

}

return nil, errors.New("batch update error.")

代码上

一个个函数来写吧

func (t *Trie) updateTrieWhenMeetExt(rootNode *node, tree *TNode) ([]byte, error){

path := rootNode.Val[1]

next := rootNode.Val[2]

//fmt.Println("enter ext node: path = ", path, ", next = ", next)

var idx int

var e byte

current := tree

for idx, e = range path {

if len(current.Children) == 1 && e == current.Children[0].Key[0] {

current = current.Children[0]

//fmt.Println("****************")

} else {

//fmt.Println("----------------, idx = ", idx, ", e = ", e)

break

}

}

if idx > 0 && idx == len(path)-1 {

rootNode.Val[2], _  = t.batchUpdate(next, current)

} else if idx == 0 {

brNode := emptyBranchNode()

brNode.Val[path[0]] = next

for _, elem := range current.Children {

if elem.Key[0] == path[0] {

brNode.Val[path[0]], _ = t.batchUpdate(next, elem)

}

brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)

}

if err := t.commitNode(brNode); err != nil {

return nil ,err

}

return brNode.Hash, nil

} else {

rootNode.Val[1] = path[:idx]

brNode := emptyBranchNode()

brNode.Val[path[idx]] = next

for _, elem := range current.Children {

//fmt.Println("2222222222222****************, idx = ", idx)

if elem.Key[0] == path[idx] {

//fmt.Println("333333333333****************")

brNode.Val[path[idx]], _ = t.batchUpdate(next, elem)

}

brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)

}

if err := t.commitNode(brNode); err != nil {

return nil, err

}

rootNode.Val[2] = brNode.Hash

}

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

//fmt.Println("4444444444***************")

return rootNode.Hash, nil

}

这里有三种情况,。、

假如只有一个孩子而且孩子的key和path一个个比配时候,就会一直走下去,直到Next节点。

还有两种情况。说起来好麻烦,自己看代码吧

func (t *Trie) updateTrieWhenMeetBranch(rootNode *node, tree *TNode) ([]byte, error){

//fmt.Println("enter branch node: rootNode = ", rootNode, ", tree = ", tree)

var err error

for _, elem := range tree.Children {

if rootNode.Val[elem.Key[0]] != nil {

rootNode.Val[elem.Key[0]], err = t.batchUpdate(rootNode.Val[elem.Key[0]], elem)

if err != nil {

return nil, err

}

} else {

rootNode.Val[elem.Key[0]], err = t.updateNewTrie(elem)

if err != nil {

return nil, err

}

}

}

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

return rootNode.Hash, nil

}

func (t *Trie) updateTrieWhenMeetLeaf(rootNode *node, tree *TNode) ([]byte, error) {

path := rootNode.Val[1]

//fmt.Println("enter leaf node: root.path = ", path)

var idx int

var e byte

current := tree

for idx, e = range path {

if len(current.Children) > 1 || e != current.Children[0].Key[0]{

//fmt.Println("leaf:**************")

break

}

//fmt.Println("leaf:-------------------")

current = current.Children[0]

}

//fmt.Println("new ext include : ", path[:idx], ", idx = ", idx)

if idx == len(path)-1 {

//fmt.Println("leaf:2222**************")

rootNode.Val[2] = current.Val

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

return rootNode.Hash, nil

} else if idx == 0 {

brNode := emptyBranchNode()

rootNode.Val[1] = path[1:]

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

brNode.Val[e] = rootNode.Hash

for _, elem := range current.Children {

if elem.Key[0] == e {

brNode.Val[e], _ = t.batchUpdate(brNode.Val[e], elem)

} else {

brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)

}

}

if err := t.commitNode(brNode); err != nil {

return nil, err

}

return brNode.Hash, nil

} else {

//fmt.Println("leaf:333**************: path = ", path[idx+1:])

rootNode.Val[1] = path[idx+1:]

if err := t.commitNode(rootNode); err != nil {

return nil, err

}

brNode := emptyBranchNode()

brNode.Val[path[idx]] = rootNode.Hash

for _, elem := range current.Children {

if elem.Key[0] == path[idx] {

brNode.Val[path[idx]], _ = t.batchUpdate(brNode.Val[path[idx]], elem)

}

brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)

}

if err := t.commitNode(brNode); err != nil {

return nil, err

}

value := [][]byte{[]byte{byte(ext)}, path[:idx], brNode.Hash}

newNode, err := t.createNode(value)

if err != nil {

return nil, err

}

return newNode.Hash, nil

}

}

以上代码有一处bug,能看出不来就说明看懂了

end

相关文章

网友评论

      本文标题:新建文章 : 批量更新

      本文链接:https://www.haomeiwen.com/subject/pladuftx.html