路由表初始化
在 devp2p
中路由表被存储在 leveldb
中,启动时会加载到 Table
结构体中
- 代码位置
p2p/discover/table.go
p2p/server.go:Server很容易可以找到调用
p2p/Server.Start
方法的地方,假设启动前已经设置好bootnode
并且没有启用nodiscover
开关,那么会按照这个时序图标注的关键路径一直调用到Taboe.loop()
,整个填表的过程都在loop()
函数中,具体初始化行为在 Table.doRefres 函数进行初始化,其关键调用路径如下:
Table.doRefresh 函数
看个大概意思就可以,以太坊对此更新比较频繁,大体意思不变,不用每行都读,很可能下一个版本就对不上了
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// 这里 seednodes 当第一次启动时会是 bootnodes,因为 db 里没有历史记录
tab.loadSeedNodes()
// 这个 lookup 如果不成功,就会阻塞在这里,除非 bootnode 死掉了,否则一定成功
// 查询自己的目的是在 tab 为空时把 bootnode 填进去
tab.lookup(tab.self.ID, false)
// 随机的再去 Query 几个节点,这是 KAD DHT 的经典做法
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}
K桶 与 逻辑距离
计算节点逻辑距离来定位桶号,这几乎是 KAD DHT
唯一的精髓所在了,看看以太坊是怎么做的,以太坊只分了 17
个桶,每个桶有 16
个位置,也就是说以太坊的 DHT
中就只有17 * 16 = 272
个节点信息
- 代码位置 :
p2p/enode/node.go
// LogDist returns the logarithmic distance between a and b, log2(a ^ b).
func LogDist(a, b ID) int {
lz := 0
for i := range a {
x := a[i] ^ b[i]
if x == 0 {
lz += 8
} else {
lz += bits.LeadingZeros8(x)
break
}
}
return len(a)*8 - lz
}
这里的
x = a[i] ^ b[i]
, 为什么x==0
时计数器lz
要加8
呢?这是因为a[i]
和b[i]
的单位都是byte
,1byte == 8bit
,所以在这里lz
是一个bit
计数器,用异或计算逻辑距离就是计算前导0
的个数,单位是bit
,通常{byte | 0 <= byte <= 255 }
所以当x != 0
时,又用bits.LeadingZeros8(x)
取了x
的前导0
个数并累加,例如当x == 17
时byte(17) == bit[0,0,0,1,0,0,0,1]
,前导0
为3
所以此时else
会执行lz += 3
最后的len(a)*8 - lz
是把一样的bit
总数减掉,剩下的就是不同的bit
数量,演算一下结果约等于log2(a^b)
- 代码位置 :
p2p/discover/table.go
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(id enode.ID) *bucket {
d := enode.LogDist(tab.self().ID(), id)
if d <= bucketMinDistance {
return tab.buckets[0]
}
return tab.buckets[d-bucketMinDistance-1]
}
唯一用到了
LogDist
函数的地方是Table.bucket
函数,在这里ID
是32byte
桶最小距离bucketMinDistance == 239
,那么d <= 239
则返回buckets[0]
就代表着只要有从左到右16 bit
以上相同的就放到第一个桶里,因为(256-16)-239-1 = 0
,其余的最大也就是32 * 8 = 256
个不同的bit
,256 - 239 - 1 = 16
正好可以放到最后一个桶里,桶的尺寸是17
即[0-16]
,
这两个函数十分重要,在我们关注的这部分代码中,在 doRefresh
的 tab.loadSeedNodes()
中被使用,这里如果 db
中的数据非空,最多会拿出 30
个 种子节点,再加上全部的 bootnodes
节点,就组成了本次的种子节点集合
lookup
这里就不讲太具体的代码了,因为 v1.8.x
和 v1.9.x
甚至连接口都变了,不过大体逻辑还是差不太多的,挑一些关键代码和思路进行简单介绍
DistCmp 逻辑距离比较
这是其中一个关键方法,用来判断两个 ID
在桶里的顺序
func DistCmp(target, a, b ID) int {
for i := range target {
da := a[i] ^ target[i]
db := b[i] ^ target[i]
if da > db {
return 1
} else if da < db {
return -1
}
}
return 0
}
这里逻辑比较简单,
target
是目标,a
和b
是竞争关系,按照每个byte
与target
进行异或,最后谁的结果大,谁的距离就远,结果小的距离就近一些,因为结果越大说明前导0
越少,也就是逻辑距离越远了
closest
这个函数是在本地K桶中寻找距离目标节点逻辑距离最近的一组节点信息
- 完整的函数签名如下:
func (tab *Table) closest(target enode.ID, nresults int, checklive bool) *nodesByDistance
这里 diss 一下以太坊的处理逻辑,这个函数中会去遍历整个桶,然后用
DistCmp
去筛选出与target
逻辑距离较近的一组节点,这个地方没有libp2p
处理的优雅 😒
重要的函数是nodesByDistance.push
,closest
的结果就是在这个函数中产生的
func (h *nodesByDistance) push(n *node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return enode.DistCmp(h.target, h.entries[i].ID(), n.ID()) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}
可以看到这里将
closest
中传递的target
放入了nodesByDistance
对象中,并作为目标传递给DisCmp
函数来比较已经生成的h.entries
集合中有没有比传入的n
离target
更远的节点,如果找到了就用n
去替换这个节点,最开始的逻辑是无条件先填满h.entries
集合,然后再去逐个替换,也就是说只要桶中的数据超过16
条那closest
方法的返回值永远都是16
条记录
lookupWorker
func (t *UDPv4) lookup(targetKey encPubkey) []*node {
......
result = t.tab.closest(target, bucketSize, false)
......
go t.lookupWorker(n, targetKey, reply)
......
return result.entries
}
接下来由 3 个协程对
closest
结果集中的前三个节点,也就是向本地与target
逻辑距离最近的三个节点发送findnode
请求并等待reply
,再将返回的结果插入到target
所在的桶中,至此即完成lookup
Table.loop 函数
可以看到 loop
一启动就用一个协程去执行了 doRefresh
函数,然后进入一个循环的 select{}
代码块,阻塞等待下一个任务,其中 refresh
这个 Ticker 为 30 分钟,所以每 30 分钟还会周期性的执行 doRefresh
函数,并且查询的目标为随机,还有一个触发条件就是主动产生 refreshReq
请求,这个请求会从 dialTask.Do
函数中调用 dailTask.resolve(srv)
中触发
// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes.
func (tab *Table) loop() {
var (
refresh = time.NewTicker(refreshInterval)
......
// Start initial refresh.
go tab.doRefresh(refreshDone)
......
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
......
// 这个定时任务是 10秒 执行一次,用来判定节点状态
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
......
}
......
}
Table.doRevalidate
这个函数很重要,因为节点数据进桶之前并没有进行过状态交验
discoverTask
在启动节点后,每一个连接任务都会判断如果 peers 少于 25(默认25),就要增加一个 discoverTask
,用来执行 srv.ntab.LookupRandom()
,目的是随机 findnode 以便填充路由表
强行总结
相比于优雅的 libp2p 以太坊的 DHT 实在是简陋,甚至有点丑陋,无心再往下看了。
网友评论