我们了解一下星云链的数据结构:
采用一种类似MPT树来贮存数据、但是和以太坊的树有细微的差别。
星云链三种节点:
第一种:Branch Node: 16-elements array, value is [hash_0, hash_1, ..., hash_f, hash]
第二种:Extension Node: 3-elements array, value is [ext flag, prefix path, next hash]
第三种: Leaf Node: 3-elements array, value is [leaf flag, suffix path, value]
从代码上看
type node struct {
Hash []byte
Bytes []byte
Val [][]byte
}
其实很简单Hash []byte体现了merkel tree的特性
Val [][]byte体现了了patricia tree的特性
我们再看树的结构
type Trie struct {
rootHash []byte
storage storage.Storage
changelog []*Entry
needChangelog bool
}
存着一个根hash
这个hash有什么用呢,我们可以使用这个hash从关系型数据库(在星云链上使用的是rocksdb)拿到trie tree的一个节点node。或者从内存,或者从leveldb。星云链实现了三种接口。具体得到在哪个地方使用。
代码如下:
func (t *Trie) fetchNode(hash []byte) (*node, error) {
ir, err := t.storage.Get(hash)
if err != nil {
return nil, err
}
pb := new(triepb.Node)
if err := proto.Unmarshal(ir, pb); err != nil {
return nil, err
}
n := new(node)
if err := n.FromProto(pb); err != nil {
return nil, err
}
return n, nil
}
取数据(节点)在上边,下边一段代码是存数据:
func (t *Trie) commitNode(n *node) error {
pb, err := n.ToProto()
if err != nil {
return err
}
n.Bytes, err = proto.Marshal(pb)
if err != nil {
return err
}
n.Hash = hash.Sha3256(n.Bytes)
return t.storage.Put(n.Hash, n.Bytes)
}
关键就是把数据进行sha3256hash计算,然后使用put存进去。接口都在storage那里定义好了
当然put返回一个Nil或者err.代表成功或者失败
好,接下来我们使用这个结构查找一个数据。
先把代码贴出来吧:虽然很影响篇幅,查找自然是使用到patricia 结构啦
首先函数传入一个roothash 能够定位到节点,然后从该节点开始,使用route往下查找。
func (t *Trie) get(rootHash []byte, route []byte) ([]byte, error) {
//fmt.Println("enter get: rootHash = ", rootHash, ", route = ", route)
curRootHash := rootHash
curRoute := route
for len(curRoute) >= 0 {
rootNode, err := t.fetchNode(curRootHash)
if err != nil {
//fmt.Println("get: fetchNode error")
return nil, err
}
flag, err := rootNode.Type()
if err != nil {
//fmt.Println("get: assert Type error")
return nil, err
}
if len(curRoute) == 0 && flag != leaf {
//fmt.Println("get: too short")
return nil, errors.New("wrong key, too short")
}
switch flag {
case branch:
curRootHash = rootNode.Val[curRoute[0]]
curRoute = curRoute[1:]
//fmt.Println("in case branch: curRootHash = ", curRootHash, ", curRoute = ", curRoute)
break
case ext:
path := rootNode.Val[1]
next := rootNode.Val[2]
matchLen := prefixLen(path, curRoute)
if matchLen != len(path) {
//fmt.Println("get: not matchLen, path = ", path, ", curRoute = ", curRoute)
return nil, ErrNotFound
}
curRootHash = next
curRoute = curRoute[matchLen:]
//fmt.Println("in case ext : curRootHash = ", curRootHash, ", curRoute = ", curRoute)
break
case leaf:
path := rootNode.Val[1]
matchLen := prefixLen(path, curRoute)
if matchLen != len(path) || matchLen != len(curRoute) {
//fmt.Println("get:**************")
return nil, ErrNotFound
}
//fmt.Println("get:22222************")
return rootNode.Val[2], nil
default:
//fmt.Println("get:3333************")
return nil, errors.New("unknown node type")
}
}
return nil, ErrNotFound
}
这里稍等一下,node里面的【】byte代表什么呢。
原来是val经过proto.marshal后的数据。
可想而知在网络上传输的数据是byte[]的数据。然后到了对方的电脑上经过unmarshal一下就可以得到val了。然后再将byte[]sha3256一下就可以得到hash了。据说go proto比json和xml序列化好。是不是真的
这个函数里面使用一个for循环查找数据,地球人都知道
当然了这棵树和Merkel树是有很大差别。merkel是用两个子节点来计算父节点hash,而这棵树有自己的特征,直接是从一个子节点得到父节点hash.但是因为分支节点的存在,所以还是很方便遍历的。
接下来就是Put一个数据进到trie里面了。注意~~前面的那个put是调用内存接口,跟这个没关系。嘿嘿
继续贴代码:
func (t *Trie) Put(key []byte, val []byte) ([]byte, error) {
newHash, err := t.update(t.rootHash, keyToRoute(key), val)
if err != nil {
return nil, err
}
t.rootHash = newHash
if t.needChangelog {
entry := &Entry{Update, key, nil, val}
t.changelog = append(t.changelog, entry)
}
return newHash, nil
}
是不是很简单:继续往下贴update代码
不急不急,先搞定keyToRoute代码
func keyToRoute(key []byte) []byte {
l := len(key) * 2
var route = make([]byte, l)
for i, b := range key {
route[i*2] = b / 16
route[i*2+1] = b % 16
}
return route
}
这个看应该看得懂,就是key原本是16进制的数0~f。把每个十六进制的十六位和个位都拆开存在数组里。因为route是0~f的嘛。
可以继续分析update代码了
func (t *Trie) update(root []byte, route []byte, val []byte) ([]byte, error) {
if root == nil || len(root) == 0 {
// directly add leaf node
value := [][]byte{[]byte{byte(leaf)}, route, val}
node, err := t.createNode(value)
if err != nil {
return nil, err
}
return node.Hash, nil
}
rootNode, err := t.fetchNode(root)
if err != nil {
return nil, err
}
flag, err := rootNode.Type()
if err != nil {
return nil, err
}
switch flag {
case branch:
return t.updateWhenMeetBranch(rootNode, route, val)
case ext:
return t.updateWhenMeetExt(rootNode, route, val)
case leaf:
return t.updateWhenMeetLeaf(rootNode, route, val)
default:
return nil, errors.New("unknown node type")
}
}
简单看一下createnode
func (t *Trie) createNode(val [][]byte) (*node, error) {
n := &node{Val: val}
if err := t.commitNode(n); err != nil {
return nil, err
}
return n, nil
}
诶就是把你的数据变成一个node
接下来到了核心的:
switch flag {
case branch:
return t.updateWhenMeetBranch(rootNode, route, val)
case ext:
return t.updateWhenMeetExt(rootNode, route, val)
case leaf:
return t.updateWhenMeetLeaf(rootNode, route, val)
default:
return nil, errors.New("unknown node type")
}
这三个函数一个个分析就完了。
第一个:
func (t *Trie) updateWhenMeetBranch(rootNode *node, route []byte, val []byte) ([]byte, error) {
// update sub-trie
newHash, err := t.update(rootNode.Val[route[0]], route[1:], val)
if err != nil {
return nil, err
}
// update the branch hash
rootNode.Val[route[0]] = newHash
// save updated node to storage
t.commitNode(rootNode)
return rootNode.Hash, nil
}
当我们拿到的是Branch节点的时候
还记得branch 的route只有一个数嘛
然后拿传入的route的第一个往branch的那个hash往下找。当然这是一个递归过程
这个最简单了
接下来如果遇到ext节点:
func (t *Trie) updateWhenMeetExt(rootNode *node, route []byte, val []byte) ([]byte, error) {
var err error
path := rootNode.Val[1]
next := rootNode.Val[2]
if len(path) > len(route) {
return nil, errors.New("wrong key, too short")
}
matchLen := prefixLen(path, route)
// add new node to the ext node's sub-trie
if matchLen == len(path) {
newHash, err := t.update(next, route[matchLen:], val)
if err != nil {
return nil, err
}
// update the new hash
rootNode.Val[2] = newHash
// save updated node to storage
t.commitNode(rootNode)
return rootNode.Hash, nil
}
// create a new branch for the new node
brNode := emptyBranchNode()
// 4 situations
// 1. matchLen > 0 && matchLen == len(path), 24 meets 24... => ext - branch - ...
// 2. matchLen > 0 && matchLen + 1 < len(path), 23... meets 24... => ext - branch - ext ...
// 3. matchLen = 0 && len(path) = 1, 1 meets 2 => branch - ...
if matchLen > 0 || len(path) == 1 {
// a branch to hold the ext node's sub-trie
brNode.Val[path[matchLen]] = next
if matchLen > 0 && matchLen+1 < len(path) {
value := [][]byte{[]byte{byte(ext)}, path[matchLen+1:], next}
extNode, err := t.createNode(value)
if err != nil {
return nil, err
}
brNode.Val[path[matchLen]] = extNode.Hash
}
// a branch to hold the new node
if brNode.Val[route[matchLen]], err = t.update(nil, route[matchLen+1:], val); err != nil {
return nil, err
}
// save branch to the storage
if err := t.commitNode(brNode); err != nil {
return nil, err
}
// if no common prefix, replace the ext node with the new branch node
if matchLen == 0 {
return brNode.Hash, nil
}
// use the new branch node as the ext node's sub-trie
rootNode.Val[1] = path[0:matchLen]
rootNode.Val[2] = brNode.Hash
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
return rootNode.Hash, nil
}
// 4. matchLen = 0 && len(path) > 1, 12... meets 23... => branch - ext - ...
rootNode.Val[1] = path[1:]
// save ext node to storage
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
brNode.Val[path[matchLen]] = rootNode.Hash
// a branch to hold the new node
if brNode.Val[route[matchLen]], err = t.update(nil, route[matchLen+1:], val); err != nil {
return nil, err
}
// save branch to the storage
if err := t.commitNode(brNode); err != nil {
return nil, err
}
return brNode.Hash, nil
}
代码是不是有点长
这个节点之所以代码长的原因是有几种情况。
第一种是当我们传入的route已经不足以比配他的suffix path。这时候该怎么做呢。这是候应该返回错误。因为你的route是错的。为什么,代码上这么写的。
但是我可以猜想:所有key的长度都是一样的,所以到suffix 一半就结束的key是错的。正确的key必须得比配到leaf节点(或者自己创建的leaf节点)
但是代码中只有>没有=号,所以是不是代码写错了。如果他的长度等于suffix path也应该是错的。因为ext后边一定要有leaf或者branch节点的。但是我想代码在生成key那里也会检查吧,所以这里也不会很重要。
继续。匹配到一半,接下来就不同了。
把前面已经比配的变成ext然后创建一个Branch接下去两个分支,一个原来的,一个新的leaf。这里说得也不清楚稍微体味
第三种情况就是匹配的字符和path的长度是一样的。那么继续走下去了。
以上就是meetext的全部
接下来到meetleaf了,老规矩,贴代码
func (t *Trie) updateWhenMeetLeaf(rootNode *node, route []byte, val []byte) ([]byte, error) {
var err error
path := rootNode.Val[1]
leafVal := rootNode.Val[2]
if len(path) > len(route) {
return nil, errors.New("wrong key, too short")
}
matchLen := prefixLen(path, route)
// node exists, update its value
if matchLen == len(path) {
if len(route) > matchLen {
return nil, errors.New("wrong key, too long")
}
rootNode.Val[2] = val
// save updated node to storage
t.commitNode(rootNode)
return rootNode.Hash, nil
}
// create a new branch for the new node
brNode := emptyBranchNode()
// a branch to hold the leaf node
if brNode.Val[path[matchLen]], err = t.update(nil, path[matchLen+1:], leafVal); err != nil {
return nil, err
}
// a branch to hold the new node
if brNode.Val[route[matchLen]], err = t.update(nil, route[matchLen+1:], val); err != nil {
return nil, err
}
// save the new branch node to storage
if err := t.commitNode(brNode); err != nil {
return nil, err
}
// if no common prefix, replace the leaf node with the new branch node
if matchLen == 0 {
return brNode.Hash, nil
}
// create a new ext node, and use the new branch node as the new ext node's sub-trie
rootNode.Val[0] = []byte{byte(ext)}
rootNode.Val[1] = path[0:matchLen]
rootNode.Val[2] = brNode.Hash
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
return rootNode.Hash, nil
}
饿了,吃早餐去
假如已经到叶子节点了。如果剩下的route没有叶子节点的suffix path那么长。那么还是和之前一样返回错误。而且应该必须等于叶子节点的suffix 的长度。之前已经说过,每个Key的长度都是一样的
叶子节点需要创建一个branch和一个ext节点和一个叶子节点拉哎添加到树中。
至此整个树的插入操作已经完成
是不是发现了一个问题——————————每一个数据插入的时候都得更新 从叶子节点到根节点的hash。这和Merkel原理是一样的。
删除就不说了。我列一个图:
// Del the node's value in trie

大家自己看吧,免得影响篇幅
上面那个问题是。每次插入一个数据就得更新整个树。虽然是<lgn但是我们能不能让他先插入多个数据然后再一起更新呢
首先定义数据结构用来存node
type TNode struct {
Key []byte
Children []*TNode
Val []byte
}
每次接受到一个val就更新该结构
func UpdateTreeRecur(root *TNode, key []byte, value []byte) {
route := keyToRoute(key)
fmt.Println("route:", route)
updateTreeRecur(root, route, value)
}
func updateTreeRecur(root *TNode, route []byte, value []byte) {
if len(route) == 0 {
root.Val = value
return
}
var existed = false
var nextRoot *TNode
for _, elem := range root.Children {
if route[0] == elem.Key[0] {
existed = true
nextRoot = elem
break
}
}
if existed == false {
node := &TNode{Key: route[0:1], Children: nil, Val: nil}
root.Children = append(root.Children, node)
nextRoot = node
}
updateTreeRecur(nextRoot, route[1:], value)
}
我们的树最多有16个孩子,只有在叶子孩子才有数据。其他非叶子val的都设置为nil,而且每个非叶子节点route只有一位数
/***************************************Batch Operation**************************************************/
接下来是将这个数据结构作为个整体直接带进插入操作中。
func (t *Trie) BatchPut(tree *TNode) error {
var err error
t.rootHash, err = t.batchUpdate(t.rootHash, tree)
if err != nil {
return err
}
//fmt.Println("t.rootHash = ", t.rootHash)
return nil
}
func (t *Trie) batchUpdate(root []byte, tree *TNode) ([]byte, error) {
//fmt.Println("enter batchUpdate: root = ", root, ", tree = ", tree)
if len(tree.Children) == 0 {
return nil, errors.New("wrong happened: no records")
}
if root == nil || len(root) == 0 {
return t.updateNewTrie(tree)
}
rootNode, err := t.fetchNode(root)
if err != nil {
return nil, err
}
flag, err := rootNode.Type()
if err != nil {
return nil, err
}
switch flag {
case ext:
return t.updateTrieWhenMeetExt(rootNode, tree)
case branch:
return t.updateTrieWhenMeetBranch(rootNode, tree)
case leaf:
return t.updateTrieWhenMeetLeaf(rootNode, tree)
}
return nil, errors.New("batch update error.")
}
形式上还是抄了单个插入的写法。
首先如果传入的roothash是nil的话,那么
func (t *Trie) updateNewTrie(root *TNode) ([]byte, error){
numOfChildren := len(root.Children)
if numOfChildren > 1 {
return t.updateBranchNewTrie(root.Children)
} else if numOfChildren == 1 {
return t.updateSingleNewTrie(root.Children[0])
} else {
return nil, errors.New("number of children error!")
}
}
接下来两个函数这样写
func (t *Trie) updateBranchNewTrie(children []*TNode) ([]byte, error) {
brNode := emptyBranchNode()
for _, elem := range children {
if len(elem.Children) > 1 {
brNode.Val[elem.Key[0]], _ = t.updateBranchNewTrie(elem.Children)
} else {
brNode.Val[elem.Key[0]], _ = t.updateSingleNewTrie(elem.Children[0])
}
}
if err := t.commitNode(brNode); err != nil {
return nil, err
}
return brNode.Hash, nil
}
这是一个递归过程:关键在于如果一个TNode有很多子节点的话,那么先把他的子节点一个个更新,才更细brNode.hash。这样其实是减少了brnode.hash的更细次数
按照原来的如果有多个数那么brnode.hash是得更新多次的。
func (t *Trie) updateSingleNewTrie(root *TNode) ([]byte, error){
numOfChildren := len(root.Children)
//fmt.Println("key = ", root.Key, ", children = ", root.Children)
if numOfChildren == 0 {
fmt.Println("has arrived leaf: root.Key = ", root.Key)
value := [][]byte{[]byte{byte(leaf)}, root.Key, root.Val}
newNode, err := t.createNode(value)
if err != nil {
return nil, err
}
return newNode.Hash, nil
} else if numOfChildren == 1 {
root.Key = append(root.Key, root.Children[0].Key[0])
root.Val = root.Children[0].Val
root.Children = root.Children[0].Children
return t.updateSingleNewTrie(root)
} else if numOfChildren > 1 {
newHash, err := t.updateBranchNewTrie(root.Children)
if err != nil {
return nil, err
}
value := [][]byte{[]byte{byte(ext)}, root.Key, newHash}
newNode, err := t.createNode(value)
if err != nil {
return nil, err
}
return newNode.Hash, nil
} else {
return nil, errors.New("error happend!")
}
}
上边的代码就有到了叶子节点的情况了,也就是递归的尽头。
想想就知道了。updatebranchtrie是不可能到叶子的。因为他还有孩子节点。
========================================================================
以上是更新一个新的树,如果更新一个已经有节点的树呢,返回到
switch flag {
case ext:
return t.updateTrieWhenMeetExt(rootNode, tree)
case branch:
return t.updateTrieWhenMeetBranch(rootNode, tree)
case leaf:
return t.updateTrieWhenMeetLeaf(rootNode, tree)
}
return nil, errors.New("batch update error.")
代码上
一个个函数来写吧
func (t *Trie) updateTrieWhenMeetExt(rootNode *node, tree *TNode) ([]byte, error){
path := rootNode.Val[1]
next := rootNode.Val[2]
//fmt.Println("enter ext node: path = ", path, ", next = ", next)
var idx int
var e byte
current := tree
for idx, e = range path {
if len(current.Children) == 1 && e == current.Children[0].Key[0] {
current = current.Children[0]
//fmt.Println("****************")
} else {
//fmt.Println("----------------, idx = ", idx, ", e = ", e)
break
}
}
if idx > 0 && idx == len(path)-1 {
rootNode.Val[2], _ = t.batchUpdate(next, current)
} else if idx == 0 {
brNode := emptyBranchNode()
brNode.Val[path[0]] = next
for _, elem := range current.Children {
if elem.Key[0] == path[0] {
brNode.Val[path[0]], _ = t.batchUpdate(next, elem)
}
brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)
}
if err := t.commitNode(brNode); err != nil {
return nil ,err
}
return brNode.Hash, nil
} else {
rootNode.Val[1] = path[:idx]
brNode := emptyBranchNode()
brNode.Val[path[idx]] = next
for _, elem := range current.Children {
//fmt.Println("2222222222222****************, idx = ", idx)
if elem.Key[0] == path[idx] {
//fmt.Println("333333333333****************")
brNode.Val[path[idx]], _ = t.batchUpdate(next, elem)
}
brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)
}
if err := t.commitNode(brNode); err != nil {
return nil, err
}
rootNode.Val[2] = brNode.Hash
}
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
//fmt.Println("4444444444***************")
return rootNode.Hash, nil
}
这里有三种情况,。、
假如只有一个孩子而且孩子的key和path一个个比配时候,就会一直走下去,直到Next节点。
还有两种情况。说起来好麻烦,自己看代码吧
func (t *Trie) updateTrieWhenMeetBranch(rootNode *node, tree *TNode) ([]byte, error){
//fmt.Println("enter branch node: rootNode = ", rootNode, ", tree = ", tree)
var err error
for _, elem := range tree.Children {
if rootNode.Val[elem.Key[0]] != nil {
rootNode.Val[elem.Key[0]], err = t.batchUpdate(rootNode.Val[elem.Key[0]], elem)
if err != nil {
return nil, err
}
} else {
rootNode.Val[elem.Key[0]], err = t.updateNewTrie(elem)
if err != nil {
return nil, err
}
}
}
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
return rootNode.Hash, nil
}
func (t *Trie) updateTrieWhenMeetLeaf(rootNode *node, tree *TNode) ([]byte, error) {
path := rootNode.Val[1]
//fmt.Println("enter leaf node: root.path = ", path)
var idx int
var e byte
current := tree
for idx, e = range path {
if len(current.Children) > 1 || e != current.Children[0].Key[0]{
//fmt.Println("leaf:**************")
break
}
//fmt.Println("leaf:-------------------")
current = current.Children[0]
}
//fmt.Println("new ext include : ", path[:idx], ", idx = ", idx)
if idx == len(path)-1 {
//fmt.Println("leaf:2222**************")
rootNode.Val[2] = current.Val
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
return rootNode.Hash, nil
} else if idx == 0 {
brNode := emptyBranchNode()
rootNode.Val[1] = path[1:]
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
brNode.Val[e] = rootNode.Hash
for _, elem := range current.Children {
if elem.Key[0] == e {
brNode.Val[e], _ = t.batchUpdate(brNode.Val[e], elem)
} else {
brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)
}
}
if err := t.commitNode(brNode); err != nil {
return nil, err
}
return brNode.Hash, nil
} else {
//fmt.Println("leaf:333**************: path = ", path[idx+1:])
rootNode.Val[1] = path[idx+1:]
if err := t.commitNode(rootNode); err != nil {
return nil, err
}
brNode := emptyBranchNode()
brNode.Val[path[idx]] = rootNode.Hash
for _, elem := range current.Children {
if elem.Key[0] == path[idx] {
brNode.Val[path[idx]], _ = t.batchUpdate(brNode.Val[path[idx]], elem)
}
brNode.Val[elem.Key[0]], _ = t.updateNewTrie(elem)
}
if err := t.commitNode(brNode); err != nil {
return nil, err
}
value := [][]byte{[]byte{byte(ext)}, path[:idx], brNode.Hash}
newNode, err := t.createNode(value)
if err != nil {
return nil, err
}
return newNode.Hash, nil
}
}
以上代码有一处bug,能看出不来就说明看懂了
end
网友评论