美文网首页
以Kademlia为例实战DHT(四)

以Kademlia为例实战DHT(四)

作者: 建怀 | 来源:发表于2018-09-27 11:46 被阅读0次

    DHT

    DHT的消息类型有query,response和error。其RPCs有四种:

    • ping:查看节点能否ping通,如果能,就将其保存到路由表中。
    • find_node:查找节点,确保DHT路由表能够正常使用。
    • get_peers:节点反复询问DHT节点获取数据。
    • announce_peer:对外宣布,与某个节点连接并正在下载torrent。

    首先查看一下dht.go里面有的结构体:

    type Config struct {
        Address string                  // 监听的IP address,如果留下空白,会自动选择一个。
        Port int                        // DHT节点会监听的UDP端口,如果是0,将会挑选一个随机端口。
        NumTargetPeers int              // DHT将尝试为每个被搜索的infohash寻找的对等点。这可能会被转移到per-infohash选项。默认值:5。
        DHTRouters string               // 用于引导网络的DHT路由器的分离列表。
        MaxNodes int                    // 在路由表中存储的最大节点数。默认值:100。
        CleanupPeriod time.Duration     // 在网络中ping节点的频率,以确定它们是否可到达。默认值:15分钟。
        SaveRoutingTable bool           // 如果True,节点将在启动时从磁盘读取路由表,并每隔几分钟保存磁盘上的路由表快照。默认值:True。
        SavePeriod time.Duration        // 将路由表保存到磁盘的频率。默认值:5分钟。
        RateLimit int64                 // 每秒处理的最大数据包数量。如果是负数就取消。默认值:100。
        MaxInfoHashes int               // MaxInfoHashes是我们应该保留一个对等列表的信息的数量的限制。
        // 如果这个和MaxInfoHashPeers没有改变,它应该消耗大约25 MB的RAM。更大的值有助于保持DHT网络的健康。默认值:2048。
        MaxInfoHashPeers int            // MaxInfoHashPeers是每个infohash跟踪的对等点的数量限制。一个单独的对等接触通常会消耗6个字节。默认值:256。
        ClientPerMinuteLimit int        //  ClientPerMinuteLimit 通过对抗垃圾客户端来进行保护。如果超过每分钟的数据包数量,请忽略它们的请求。默认值:50。
        ThrottlerTrackedClients int64   // ThrottlerTrackedClients是客户端节流器所记得的主机的数量。LRU是用来跟踪最有趣的。默认值:1000。
        UDPProto string                 // UDP连接的协议,udp4 = IPv4  udp6 = IPv6
    }
    
    // DHT 应该用New()来创建,能给torrent客户端提供一些DHT特征,例如发现新的对等节点让
    // torrent下载,而不需要一个tracker。
    type DHT struct {
        nodeId  string
        config  Config
        routingTable *routingTable
        peerStore   *peerStore
        conn    *net.UDPConn
        Logger  Logger
        exploredNeighborhood    bool
        remoteNodeAcquaintance  chan string
        peersRequest    chan ihReq
        nodesRequest    chan ihReq
        pingRequest chan *remoteNode
        portRequest chan int
        stop    chan bool
        wg  sync.WaitGroup
        clientThrottle  *nettools.ClientThrottle
        store   *dhtStore
        tokenSecrets    []string
        // Public channels:
        PeersRequestResults chan map[InfoHash][]string  // key = infohash , value = slice of peers
    }
    
    type ihReq struct {
        ih  InfoHash
        announce bool
    }
    

    围绕着DHT有如下方法:

    • func NewConfig() *Config
      • 把Config填充上默认值
      • var DefaultConfig = NewConfig()
      • 将默认配置申明到一个变量
    • func RegisterFlags(c *Config)
      • 用户输入的配置参数匹配到Config变量c中
    • func New(config *Config) (node *DHT,err error)
      • 创建一个DHT node,如果config是nil,使用DefaultConfig填充,一旦创建DHT node之后,配置config不能再更改。
      • 创建一个node,然后保存。
      • 然后开一个go routine来新增对等节点。
    • func (dht *DHT) AddNode(addr string)
      • 将一个新节点添加到它的路由表中。addr是一个包含目标节点的 "host:port"UDP地址的字符串。
      • 实际上将addr新增到DHT的remoteNodeAcquaintance这个chan中。
    • func randNodeId() []byte
      • 给节点随机生成一个20字节的NodeId。
    • func newTokenSecret() string
      • 给DHT随机生成一个tokenSecret,然后添加到DHT的tokenSecrets的列表中。
    • func (d *DHT) PeersRequest(ih string,announce bool)
      • PeersRequest要求DHT为infoHash提供更多的对等点。
      • 将infoHash和announce添加到DHT的peersRequest chan中。
    • func (d *DHT) Stop()
      • 将DHT关闭。
    • func (d *DHT) Port() int
      • 返回给DHT的端口号,在初始化时端口为0表示自动端口分配,以便检索所使用的实际端口号。
    • func (d *DHT) getPeers(infoHash InfoHash)
      • 从一个infoHash请求更多的peers
      • 从这个DHT的路由表中找到最近的远程节点列表closest。
      • 如果closest为空,就从配置文件中的种子节点上开始查找。
      • 遍历closest,对其中每个远程节点进行查找。
        • d.getPeersFrom(r,infoHash)
    • func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash)
      • 从远程节点上根据infoHash进行查找,更新DHT。
      • 发送“get_peers”的命令到远程节点的pendingQueries队列中。
      • 构造query命令,发送命令。
        • sendMsg(d.conn, r.address, query)
    • func (d *DHT) findNode(id string)
      • 通过id从DHT中查找这个节点。
      • 将id转成infoHash,然后通过DHT的路由表查找这个infoHash的最近远程节点列表closest。
      • 如果列表为空,从配置文件中的种子节点上开始查找。
      • 遍历closest,对其中每个远程节点进行查找。
    • func (d *DHT) Start() (err error)
      • 启动DHT节点,在想要的地址上启动一个监听器,然后在一个单独的go routine中运行主循环。
      • 对DHT初始化socket,开启新的协程运行d.loop()
    • func (d *DHT) initSocket() (err error)
      • 初始化udp的socket,监听进来的dht请求
    • func (d *DHT) loop()
      • loop()是DHT的主要工作部分。监听进来的udp请求,直到一个新的go routine调用d.Stop()方法。
      • 构造一个数据包的arena的切片,构造一个socket通道。开辟新协程调用readFromSocket(d.conn,socketChan,bytesArena,d.stop)
      • 调用DHT的bootstrap()方法,对远程节点初始化连接。
      • 设置一个路由表清理时间闹钟,一个secretRotate的时间闹钟,如果DHT的store不为空,设置一个保存的时间闹钟。
      • for循环,直到收到通道中d.stop的信息,进行停止。
      • 如果DHT的相识远程节点地址addr匹配上,就进行打招呼。d.helloFromPeer(addr)
      • 如果DHT的节点请求req匹配上,然后将信息构造成一个map
        • m := map[InfoHash]bool{req.ih: req.announce}
        • 然后遍历req将内容填充到上面的map中。
        • 然后遍历map,如果announce为true,d.peerStore.addLocalDownload(ih)
        • 然后从这个来请求的infoHash那里获取对等节点。
      • 如果DHT的节点请求req匹配上,然后将信息构造成一个map
        • m := map[InfoHash]bool{req.ih: true}
        • 然后遍历req将内容填充到上面的map中。
        • 遍历map,从这个请求的infoHash那里获取节点。
      • 如果DHT的socket通道p匹配上,然后对p进行处理,d.processPacket(p),bytesArena.Push(p.b)
      • 如果fillTokenBucket限流匹配上,判断还没到速率极值,给tokenBucket加上极值的十分之一。
      • 如果路由表的清理时间闹钟匹配上,清理路由表,清理后,查看远程节点够不够,不够的话,d.bootstrap()。
      • 如果DHT的ping请求匹配上,执行d.pingNode(node)。
      • 如果secretRotate时间闹钟匹配上,执行d.tokenSecrets = []string{newTokenSecret(),d.tokenSecrets[0]}。
      • 如果DHT的端口请求匹配上,继续。
      • 如果保存时间闹钟匹配上,将可触达的路由表中的节点收拢起来,数量大于5,进行保存。
    • func readFromSocket(socket *net.UDPConn,conChan chan packetType,bytesArena arena,stop chan bool)
      • 从UDP socket中读取,然后将字节切片写入packetType的通道。
      • 进行for循环,将bytesArena切片中的元素b一个一个弹出来,然后通过b从socket读取获得n,addr。
      • 检验后,将读取的addr和元素b构造packetType,然后对packetType判断,如果不是stop,就放到conChan。
      • 对整个协程进行通道判断,如果stop就跳出协程。
    • func (d *DHT) bootstrap()
      • 对DHT进行远程节点连接初始化。
      • 遍历DHT配置文件中的种子节点,查找远程节点。
    • func (d *DHT) helloFromPeer(addr string)
      • 对相识的远程节点打招呼。
      • 对远程节点在自己DHT的路由表中进行查找,如果存在,返回。
      • 如果不存在,且DHT的路由表中元素数量小于MaxNodes,进行ping()操作。
    • func (d *DHT) ping(address string)
      • 根据远程节点地址进行getOrCreateNode()操作,获取其remoteNode。
      • 然后调用DHT的pingNode()方法。
    • func (d *DHT) pingNode(r *remoteNode)
      • 构造一个ping的请求信息。
      • 发送请求信息。
    • func (d *DHT) processPacket(p packetType)
      • 检查DHT的客户端主机是否blocked掉消息包里面的IP地址。
      • 检查消息包p的协议是否是一些不支持的扩展协议。
      • r,err := readResponse(p),从p获取r
      • 对r进行匹配,r.Y如果匹配上"r":
        • 对获取的结果远程node id进行检验。
        • 查看回复r消息R是否来自自己。
        • 在路由表中对消息包p地址raddr进行查找,
          • 如果不存在,路由表还没满,就ping一下这个地址。
          • 如果存在,但node id为空,需要修改路由表。
          • 如果存在,但node id不等于回复r消息R的id,表明Node改变了ID。
        • 如果回复r的事务ID在node的pendingQueries为true:
          • 如果之前状态是不可达,改成可达。
          • 上一次响应时间修改成当前时间。
          • 过去查询列表新增这次事务。
          • 根据这个远程节点node进行路由表的更新。
          • 如果DHT需要更多节点,就去查找更多节点。
          • 将DHT的exploredNeighborhood修改成true。
          • 对pendingQueries中的query进行处理。
            • 匹配ping,get_peers,find_node,announce_peer进行处理。
          • 最后删除掉远程节点上pendingQueries中的r.T ,delete(node.pendingQueries,r.T)。
      • 对r进行匹配,r.Y如果匹配上"q":
        • 查看query是否来自自己。
        • 在路由表中对消息包p地址raddr进行查找,
          • 如果不存在,路由表还没满,就ping一下这个地址。
        • 匹配r.Q,匹配ping,get_peers,find_node,announce_peer进行处理。
    • func (d *DHT) needMoreNodes() bool
      • 路由表中的节点数小于minNodes或者2倍的节点数还是小于MaxNodes。
    • func (d *DHT) getMorePeers(r *remoteNode)
      • 根据远程节点从已有的路由表中获取更多对等点。
      • 对DHT中peerStore中localActiveDownloads进行遍历。
      • 如果其中infoHash判断需要更多对等点。
        • 如果远程节点为空,直接根据infoHash获取对等点。
        • 如果远程节点不为空,根据远程节点r和infoHash获取对等点。
    • func (d *DHT) findNodeFrom(r *remoteNode, id string)
      • 根据某远程节点r和id在DHT中查找节点
      • 构建find_node的请求信息query。
      • 发送信息,sendMsg(d.conn,r.address,query)。
    • func (d *DHT) needMorePeers(ih InfoHash) bool
      • DHT将尝试为每个被搜索的infohash寻找的对等点,判断是否达到目标数量。
    • func (d *DHT) replyAnnouncePeer(addr net.UDPAddr,node *remoteNode,r responseType)
      • 给远程节点回复信息。
    • func (d *DHT) replyGetPeers(addr net.UDPAddr,r responseType)
      • 回复获取peers的请求。
    • func (d *DHT) replyFindNode(addr net.UDPAddr,r responseType)
      • 回复查找节点node的请求。
    • func (d *DHT) replyPing(addr net.UDPAddr,response responseType)
      • 回复ping的请求。
    • func (d *DHT) processFindNodeResults(node *remoteNode,resp responseType)
      • 处理另外一个节点对get_peers请求的响应。
    • func (d *DHT) announcePeer(address net.UDPAddr, ih InfoHash, token string)
      • 使用token进行认证,然后向目标地址发送一条信息,宣传我们这个节点是这个infoHash的对等点。
    • func (d *DHT) checkToken(addr net.UDPAddr, token string) bool
      • 对另外一个节点地址和token进行检查。
    • func (d *DHT) hostToken(addr net.UDPAddr, secret string) string
      • 将另外一个节点地址和密文进行加密处理。
    • func (d *DHT) peersForInfoHash(ih InfoHash) []string
      • 返回DHT中跟infoHash联系的peers。
    • func (d *DHT) nodesForInfoHash(ih InfoHash) string
      • 返回DHT中跟infoHash联系的node。
    • func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType)
      • 处理从远程节点获取peer的结果信息。

    相关文章

      网友评论

          本文标题:以Kademlia为例实战DHT(四)

          本文链接:https://www.haomeiwen.com/subject/koceoftx.html