以Kademlia为例实战DHT(一)
DHT的代码实战,基本的原理可以查看我的博客:
当然还有这个博客将DHT以Kademlia为例讲得很清晰:
聊聊分布式散列表(DHT)的原理——以 Kademlia(Kad) 和 Chord 为例
作为一篇实战的博客,将不再讲解基本原理部分,直接进入主题:
当然首先要感谢nictuku同志,这是他的代码仓库地址:
https://github.com/nictuku/dht
KAD网络是非常复杂的分布式网络编程,KAD网络大概由下面几部分组成:
- 节点内部数据存储 RoutingTable
- 节点之间通信 KRPC
- DHT网络 DHT
数据存储
我们重点分析项目的两个脚本:
- store.go
- peer_store.go
store.go
将dht的路由表保存到硬盘,先看一下dhtStore的结构体:
type dhtStore struct {
Id []byte
Port int
Remotes map[string][]byte // Key:IP,Value:node ID
path string // Empty if the store is disabled
}
包括了节点的ID,还有端口,节点路由表中保存的远程节点,其结构也很简单,Key是远程节点IP,值是其NodeID。
这个脚本有三个基本方法:
- func mkdirStore() string
- 创建一个保存的目录,并赋予权限,返回表示目录的字符串
- func openStore(port int, enabled bool) (cfg *dhtStore)
- 输入端口,是否能打开的参数,将保存的目录打开
- func saveStore(s dhtStore)
- 将dhtStore写入到硬盘
peer_store.go
将对等点peer保存到硬盘上。先分析一下结构体:
type peerContactsSet struct {
set map[string]bool
// 需要确保不同的peers在不同时间返回
ring *ring.Ring
}
// ring包是Go标准包Container中的几个常用容器类型之一:container/list,container/heap,container/ring
// ring包提供了环形链表的操作,仅导出一个类型:Ring
// type Ring struct {
// Value interface{} // Value类型是interface{},接受任意类型
// }
// 其中有一系列方法:
// func New(n int) *Ring 创建一个长度为n的环形链表
// func (r *Ring) Do(f func(interface{})) 对环形链表的每一个元素进行f(x)操作
// func (r *Ring) Len() int 获取环形链表长度
// func (r *Ring) Link(s *Ring) *Ring r和s同一环形链表中,删除r和s之间的元素,被删除的元素组成一个新的环形链表,返回值为该环形链表的指针;如果r和s不同环形链表,将s插入到r后面
// func (r *Ring) Move(n int) *Ring 移动n%r.Len()个位置,n正负均可
// func (r *Ring) Next() *Ring
// func (r *Ring) Prev() *Ring
// func (r *Ring) Unlink(n int) *Ring 删除r后面的n%r.Len()个元素
type peerStore struct {
//为infohash缓存对等点。每一个键都是一个infohash,而值是peerContactsSet。
infoHashPeers *lru.Cache
localActiveDownloads map[InfoHash]bool
maxInfoHashes int
maxInfoHashPeers int
}
要理解这两个脚本的全部含义,需要对Go的标准包Ring进行知识扫盲,可以查看下面的博客:
peerStore有四个基本属性,infoHashPeers,localActiveDownloads,maxInfoHashes,maxInfoHashPeers。infoHashPeers是LRU(Least recently used)算法的缓存,Key是infohash,而Value是peerContactsSet结构体。
缓存是比较昂贵的,通常最近被访问的数据,后面继续被访问的概率高,将所有数据按访问时间进行排序,并按驱逐出旧数据,那缓存的数据就为热点数据。LRU算法是基于这种假设的一直缓存置换算法。
这个脚本的方法:
- func (p *peerContactsSet) next() []string
- 从peerContactsSet中返回8个节点的联系节点id列表
- func (p *peerContactsSet) put(peerContact string) bool
- 把一个peerContact添加到infohash联系节点集合。
- func (p *peerContactsSet) drop(peerContact string) string
- 把一个peerContact从infohash联系节点集合汇总删除。
- func (p *peerContactsSet) dropDead() string
- 把peerContactsSet中死掉的节点删除。
- func (p *peerContactsSet) kill(peerContact string)
- func (p *peerContactsSet) Size() int
- func (p *peerContactsSet) Alive() int
- func newPeerStore(maxInfoHashes,maxInfoHashPeers int) *peerStore
- func (h *peerStore) get(ih InfoHash) *peerContactsSet
- func (h *peerStore) count(ih InfoHash) int
- func (h *peerStore) alive(ih InfoHash) int
- func (h *peerStore) peerContacts(ih InfoHash) []string
- 会调用上面的next()方法获得8个节点的联系节点id列表
- func (h *peerStore) addContact(ih InfoHash, peerContact string) bool
- func (h *peerStore) killContact(peerContact string)
- func (h *peerStore) addLocalDownload(ih InfoHash)
- 将peerStore中的localActiveDownloads列表中Key为infohash修改成true
- func (h *peerStore) hasLocalDownload(ih InfoHash) bool
- 询问Key为infohash的在peerStore中的localActiveDownloads是否为true
更多代码注解参考我的代码仓库代码:
网友评论