美文网首页以太坊(ethereum)实现研究
ethereum p2p Kademlia的实现之三

ethereum p2p Kademlia的实现之三

作者: 古则 | 来源:发表于2018-04-18 14:41 被阅读16次

    ethereum p2p Kademlia的实现之一
    ethereum p2p Kademlia的实现之二

    1.初始化,seednode的添加

    //p2p/discover/table.go
    func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) 
    =>
    tab.loadSeedNodes(false)
    
    func (tab *Table) loadSeedNodes(bond bool) {
        seeds := tab.db.querySeeds(seedCount, seedMaxAge)
        seeds = append(seeds, tab.nursery...)
        //bond为false
        if bond {
            seeds = tab.bondall(seeds)
        }
        for i := range seeds {
            seed := seeds[i]
            age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
            log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
            tab.add(seed)
        }
    }
    

    先从数据库中查得符合条件的节点,将bootsnode(nursery)一起添加到k桶中

    // 首先获得需要将node放入k桶的哪一行,如果改行还有剩余空间,放入
    // 如果没有剩余空间,从这一行的replacements中选出中选出活跃时间最早(最小)的一个节点,替换掉
    func (tab *Table) add(new *Node) {
        tab.mutex.Lock()
        defer tab.mutex.Unlock()
    
        b := tab.bucket(new.sha)
        if !tab.bumpOrAdd(b, new) {
            // Node is not in table. Add it to the replacement list.
            tab.addReplacement(b, new)
        }
    }
    
    // 该方法用于确定将node放入k桶的哪一行
    // bucket returns the bucket for the given node ID hash.
    func (tab *Table) bucket(sha common.Hash) *bucket {
        d := logdist(tab.self.sha, sha)
        if d <= bucketMinDistance {
            return tab.buckets[0]
        }
        return tab.buckets[d-bucketMinDistance-1]
    }
    

    可见每一行中replacements的作用

    2.K桶的维护(检查,刷新等操作)

    调用过程

    //p2p/discover/table.go
    func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) 
    =>
    func (tab *Table) loop()
    

    下面是对loop()方法的分析

    // loop schedules refresh, revalidate runs and coordinates shutdown.
    func (tab *Table) loop() {
        var (
            revalidate     = time.NewTimer(tab.nextRevalidateTime())
            refresh        = time.NewTicker(refreshInterval)
            copyNodes      = time.NewTicker(copyNodesInterval)
            revalidateDone = make(chan struct{})
            refreshDone    = make(chan struct{})           // where doRefresh reports completion
            waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
        )
        defer refresh.Stop()
        defer revalidate.Stop()
        defer copyNodes.Stop()
    
        // Start initial refresh.
        go tab.doRefresh(refreshDone)
    
    loop:
        for {
            select {
            case <-refresh.C:
                tab.seedRand()
                if refreshDone == nil {
                    refreshDone = make(chan struct{})
                    go tab.doRefresh(refreshDone)
                }
            case req := <-tab.refreshReq:
                waiting = append(waiting, req)
                if refreshDone == nil {
                    refreshDone = make(chan struct{})
                    go tab.doRefresh(refreshDone)
                }
            case <-refreshDone:
                for _, ch := range waiting {
                    close(ch)
                }
                waiting, refreshDone = nil, nil
            case <-revalidate.C:
                go tab.doRevalidate(revalidateDone)
            case <-revalidateDone:
                revalidate.Reset(tab.nextRevalidateTime())
            case <-copyNodes.C:
                go tab.copyBondedNodes()
            case <-tab.closeReq:
                break loop
            }
        }
    
        if tab.net != nil {
            tab.net.close()
        }
        if refreshDone != nil {
            <-refreshDone
        }
        for _, ch := range waiting {
            close(ch)
        }
        tab.db.close()
        close(tab.closed)
    }
    

    这个函数主要包含三个定时器

    • revalidate = time.NewTimer(tab.nextRevalidateTime())
    • refresh = time.NewTicker(refreshInterval)
    • copyNodes = time.NewTicker(copyNodesInterval)

    分别定时执行doRefresh,doRevalidate,copyBondedNodes等三个函数

    2.1doRefresh

    // doRefresh performs a lookup for a random target to keep buckets
    // full. seed nodes are inserted if the table is empty (initial
    // bootstrap or discarded faulty peers).
    func (tab *Table) doRefresh(done chan struct{}) {
        defer close(done)
        // Load nodes from the database and insert
        // them. This should yield a few previously seen nodes that are
        // (hopefully) still alive.
        tab.loadSeedNodes(true)
        // Run self lookup to discover new neighbor nodes.
        tab.lookup(tab.self.ID, false)
        // The Kademlia paper specifies that the bucket refresh should
        // perform a lookup in the least recently used bucket. We cannot
        // adhere to this because the findnode target is a 512bit value
        // (not hash-sized) and it is not easily possible to generate a
        // sha3 preimage that falls into a chosen bucket.
        // We perform a few lookups with a random target instead.
        for i := 0; i < 3; i++ {
            var target NodeID
            crand.Read(target[:])
            tab.lookup(target, false)
        }
    }
    

    主要调用三个方法,其中tab.loadSeedNodes,在之前已经分析,两外都调用了lookup方法,只是参数不同

    // Run self lookup to discover new neighbor nodes.
        tab.lookup(tab.self.ID, false)
    
    var target NodeID
            crand.Read(target[:])
            tab.lookup(target, false)
    

    下面分析lookup方法

    • lookup方法
    //loopup方法的目的是找到接近targetID的节点
    //参数targetID不一定是一个真实存在的节点id
    func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
        var (
            target         = crypto.Keccak256Hash(targetID[:])
            asked          = make(map[NodeID]bool)
            seen           = make(map[NodeID]bool)
            reply          = make(chan []*Node, alpha)
            pendingQueries = 0
            result         *nodesByDistance
        )
        // don't query further if we hit ourself.
        // unlikely to happen often in practice.
        asked[tab.self.ID] = true
    
        for {
            tab.mutex.Lock()
            // generate initial result set
    ####
    从ntab中获得接近target的节点,存入result中,最多bucketSize个
    ####
            result = tab.closest(target, bucketSize)
            tab.mutex.Unlock()
            if len(result.entries) > 0 || !refreshIfEmpty {
                break
            }
            // The result set is empty, all nodes were dropped, refresh.
            // We actually wait for the refresh to complete here. The very
            // first query will hit this case and run the bootstrapping
            // logic.
            <-tab.refresh()
            refreshIfEmpty = false
        }
    ####
    向results节点(接近target的节点)发出findnode消息
    对返回的节点进行bond(ping pong)
    ####
        for {
            // ask the alpha closest nodes that we haven't asked yet
            for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
                n := result.entries[i]
                if !asked[n.ID] {
                    asked[n.ID] = true
                    pendingQueries++
                    go func() {
                        // Find potential neighbors to bond with
                        r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                        if err != nil {
                            // Bump the failure counter to detect and evacuate non-bonded entries
                            fails := tab.db.findFails(n.ID) + 1
                            tab.db.updateFindFails(n.ID, fails)
                            log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
    
                            if fails >= maxFindnodeFailures {
                                log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                                tab.delete(n)
                            }
                        }
                        reply <- tab.bondall(r)
                    }()
                }
            }
            if pendingQueries == 0 {
                // we have asked all closest nodes, stop the search
                break
            }
            // wait for the next reply
            for _, n := range <-reply {
                if n != nil && !seen[n.ID] {
                    seen[n.ID] = true
                    result.push(n, bucketSize)
                }
            }
            pendingQueries--
        }
        return result.entries
    }
    
    
    ####
    从ntab中获得接近target的节点,最多bucketSize个
    ####
    func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
        // This is a very wasteful way to find the closest nodes but
        // obviously correct. I believe that tree-based buckets would make
        // this easier to implement efficiently.
        close := &nodesByDistance{target: target}
        for _, b := range tab.buckets {
            for _, n := range b.entries {
                close.push(n, nresults)
            }
        }
        return close
    }
    
    
    // nodesByDistance is a list of nodes, ordered by
    // distance to target.
    type nodesByDistance struct {
        entries []*Node
        target  common.Hash
    }
    
    // push adds the given node to the list, keeping the total size below maxElems.
    func (h *nodesByDistance) push(n *Node, maxElems int) {
        ix := sort.Search(len(h.entries), func(i int) bool {
            return distcmp(h.target, h.entries[i].sha, n.sha) > 0
        })
        if len(h.entries) < maxElems {
            h.entries = append(h.entries, n)
        }
        if ix == len(h.entries) {
            // farther away than all nodes we already have.
            // if there was room for it, the node is now the last element.
        } else {
            // slide existing entries down to make room
            // this will overwrite the entry we just appended.
            copy(h.entries[ix+1:], h.entries[ix:])
            h.entries[ix] = n
        }
    }
    

    可知lookup的作用如下

    • 在ntab中找到接近target的节点
    • 像这些节点发出findnode消息
    • 更新ntab

    然后可知doRefresh作用如下

    • 在ntab中找出接近本地节点的节点
    • 对接近的节点发出findnode命令,对返回的节点完成udp握手后,放入ntab中
    • 把距离范围限定在一个随机范围内

    相关文章

      网友评论

        本文标题:ethereum p2p Kademlia的实现之三

        本文链接:https://www.haomeiwen.com/subject/nivvkftx.html