美文网首页Golang与区块链
go-libp2p-kad-dht bootstrap 源码分析

go-libp2p-kad-dht bootstrap 源码分析

作者: cc14514 | 来源:发表于2018-10-15 17:35 被阅读171次

Bootstrap 逻辑

入口代码:go-libp2p-kad-dht/dht_bootstrap.go

入口方法:
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
最终这个方法会去执行 runBootstrap 方法,真正的操作是从这里开始的

  • func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error

原理大概是,生成一个随机的目标 randomID ,然后在网络中 find 这个 peer ,因为肯定是找不到的,所以每个 peer 都会返回给你离这个 randomID 最近的 peers ,也就是 CloserPeers 来填充你本地的 k 桶,这样的动作默认是 5 分钟执行一次的,所以 k 桶很快就会被填满。

  • func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error)

这个方法会现在本地找:dht.FindLocal(id)
本地没有就在路由表里找

peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
这个方法的逻辑是,AlphaValue = 3 ,表示最低要确保发送到 3 个 peers 上
目标 id 会被 xor 出一个桶,这个桶如果是空的,或者桶里不足 3 个 peer
就会从临近的桶里返回 peer ,如果桶里 peer 超过 AlphaValue 则按桶里的总数来
奇怪的是最后又只节选了 3 个距离最近的 peer ,为什么呢?

func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
    cpl := commonPrefixLen(id, rt.local)

    rt.tabLock.RLock()

    // Get bucket at cpl index or last bucket
    var bucket *Bucket
    if cpl >= len(rt.Buckets) {
        cpl = len(rt.Buckets) - 1
    }
    bucket = rt.Buckets[cpl]

    peerArr := make(peerSorterArr, 0, count)
    //TODO 如果这里的数据比 peerArr 多,会重新 make peerArr
    peerArr = copyPeersFromList(id, peerArr, bucket.list)
    if len(peerArr) < count {
        // In the case of an unusual split, one bucket may be short or empty.
        // if this happens, search both surrounding buckets for nearby peers
        if cpl > 0 {
            plist := rt.Buckets[cpl-1].list
            peerArr = copyPeersFromList(id, peerArr, plist)
        }

        if cpl < len(rt.Buckets)-1 {
            plist := rt.Buckets[cpl+1].list
            peerArr = copyPeersFromList(id, peerArr, plist)
        }
    }
    rt.tabLock.RUnlock()

    // Sort by distance to local peer
    sort.Sort(peerArr)
    // TODO 这里为什么又截断了呢?只取了3个离的最近的,什么目的?
    if count < len(peerArr) {
        peerArr = peerArr[:count]
    }

    out := make([]peer.ID, 0, len(peerArr))
    for _, p := range peerArr {
        out = append(out, p.p)
    }

    return out
}

在 query 的同时会刷新每个有响应的 peer 的 ttl ,但是至多只向 3 个 peer 发送 query 请求

请求的返回的 closer 貌似没处理,为什么?

因为需要耐心寻找,看了一圈终于在
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)
中找到了答案,这个实在是太隐蔽了,对代码的作者表示谴责

以下是 FindPeer 中的代码片段,

    parent := ctx
    query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
        notif.PublishQueryEvent(parent, &notif.QueryEvent{
            Type: notif.SendingQuery,
            ID:   p,
        })

        pmes, err := dht.findPeerSingle(ctx, p, id)
        if err != nil {
            return nil, err
        }

        closer := pmes.GetCloserPeers()
        clpeerInfos := pb.PBPeersToPeerInfos(closer)

        // see if we got the peer here
        for _, npi := range clpeerInfos {
            if npi.ID == id {
                // add by liangc ---->
                notif.PublishQueryEvent(parent, &notif.QueryEvent{
                    Type: notif.PeerFindby,
                    ID:   p,
                })
                // add by liangc <----
                return &dhtQueryResult{
                    peer:    npi,
                    success: true,
                }, nil
            }
        }

        notif.PublishQueryEvent(parent, &notif.QueryEvent{
            Type:      notif.PeerResponse,
            ID:        p,
            Responses: clpeerInfos,
        })

        return &dhtQueryResult{closerPeers: clpeerInfos}, nil
    })

    // run it!
    result, err := query.Run(ctx, peers)

在上面的最后一句 query.Run 中,包含了如下调用轨迹

  • func (q dhtQuery) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
    • func (r dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
      • func (r *dhtQueryRunner) spawnWorkers(proc process.Process)
        • func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)

在 dhtQueryRunner.queryPeer中处理了closerPeers

    } else if len(res.closerPeers) > 0 {
        log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
        for _, next := range res.closerPeers {
            if next.ID == r.query.dht.self { // don't add self.
                log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
                continue
            }

            // add their addresses to the dialer's peerstore
            r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
            r.addPeerToQuery(next.ID)
            log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
        }
    } else {
        log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
    }

可以看到,每个 closer 都被放到了 peerstore 中,同时也放到了 peersSeen 中,
其实这里就是在迭代 peersSeen 来执行 run 方法,peersSeen 是线程安全的 set 类型
这就保证虽然所有的 run 都是在线程中执行的,但是主程序轮训时并不会错过 set 中的新成员
同时 set 还可以保证重复的 peer 不会被重复添加,这就可以断定 set 不会无休止增加

其实 bootstrap 到此就没什么可看的了

相关文章

网友评论

    本文标题:go-libp2p-kad-dht bootstrap 源码分析

    本文链接:https://www.haomeiwen.com/subject/budfzftx.html