美文网首页以太坊(ethereum)实现研究
ethereum p2p Kademlia的实现之五

ethereum p2p Kademlia的实现之五

作者: 古则 | 来源:发表于2018-04-19 20:25 被阅读17次

    这篇文章主要分析两种tcp连接的建立

    • 主动dial目标节点
    • 接受其他节点的连接

    先给出重要结论:
    无论是被动接受连接还是主动dial,最后的方法都会落在srv.SetupConn上,而srv.SetupConn会调用srv.addpeer<-node,从而进入srv.runPeer(p)

    1.主动dial

    1.1 dial任务入口

    先看server.run中的如下代码片段

    ...
    // removes t from runningTasks
        delTask := func(t task) {
            for i := range runningTasks {
                if runningTasks[i] == t {
                    runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
                    break
                }
            }
        }
        // starts until max number of active tasks is satisfied
         startTasks := func(ts []task) (rest []task) {
            i := 0
            for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
                t := ts[i]
                srv.log.Trace("New dial task", "task", t)
                go func() { t.Do(srv); taskdone <- t }()
                runningTasks = append(runningTasks, t)
            }
            return ts[i:]
        }
        scheduleTasks := func() {
            // Start from queue first.
            queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
            // Query dialer for new tasks and start as many as possible now.
            if len(runningTasks) < maxActiveDialTasks {
                nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
                queuedTasks = append(queuedTasks, startTasks(nt)...)
            }
        }
    
    running:
        for {
            scheduleTasks()
    ...
    

    主要是三个闭包函数,

    • scheduleTasks调用dialstate.newTasks生成任务,再调用startTasks
    • startTasks调用task.Do执行任务,执行完成后taskdone <- t
    • deltask 删除任务

    1.2 任务的创建,及newTasks

    func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
        if s.start.IsZero() {
            s.start = now
        }
        //fmt.Println("new task")
        //debug.PrintStack()
        var newtasks []task
    ####
    这个方法用来进行dialtask的创建
    并将创建的dialtask加入到返回参数newtasks中
    ####
        addDial := func(flag connFlag, n *discover.Node) bool {
    ####
    checkDial确保对已经dial成功节点(peers)不会再dial一遍
    ####
            if err := s.checkDial(n, peers); err != nil {
                log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
                return false
            }
            s.dialing[n.ID] = flag
            //fmt.Println("new task 1")
            newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
            return true
        }
    
        // Compute number of dynamic dials necessary at this point.
        needDynDials := s.maxDynDials
        for _, p := range peers {
            if p.rw.is(dynDialedConn) {
                needDynDials--
            }
        }
        for _, flag := range s.dialing {
            if flag&dynDialedConn != 0 {
                needDynDials--
            }
        }
    
        // Expire the dial history on every invocation.
        s.hist.expire(now)
    #######
    s.static在dialstate.addStatic方法中创建,调用路径是
    func (srv *Server) AddPeer(node *discover.Node)
    =>
     func (srv *Server) run(dialstate dialer)中的
    case n := <-srv.addstatic:
                dialstate.addStatic(n)
    这段代码将s.static放入返回参数中,由scheduleTasks调度
    #######
        for id, t := range s.static {
            err := s.checkDial(t.dest, peers)
            switch err {
            case errNotWhitelisted, errSelf:
                log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
                delete(s.static, t.dest.ID)
            case nil:
                s.dialing[id] = t.flags
                //fmt.Println("new task 2")
                newtasks = append(newtasks, t)
            }
        }
        // If we don't have any peers whatsoever, try to dial a random bootnode. This
        // scenario is useful for the testnet (and private networks) where the discovery
        // table might be full of mostly bad peers, making it hard to find good ones.
        if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
            bootnode := s.bootnodes[0]
            s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
            s.bootnodes = append(s.bootnodes, bootnode)
    
            if addDial(dynDialedConn, bootnode) {
                needDynDials--
            }
        }
        // Use random nodes from the table for half of the necessary
        // dynamic dials.
        randomCandidates := needDynDials / 2
        if randomCandidates > 0 {
            n := s.ntab.ReadRandomNodes(s.randomNodes)
            for i := 0; i < randomCandidates && i < n; i++ {
                if addDial(dynDialedConn, s.randomNodes[i]) {
                    needDynDials--
                }
            }
        }
        // Create dynamic dials from random lookup results, removing tried
        // items from the result buffer
    #####
    s.lookupBuf是discoverTask的返回值
    需要生成dialTask进行dial处理
    #####
        i := 0
        for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
            if addDial(dynDialedConn, s.lookupBuf[i]) {
                needDynDials--
            }
        }
        s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
        // Launch a discovery lookup if more candidates are needed.
    #####
    生成discoverTask任务
    #####
        if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
            s.lookupRunning = true
            //fmt.Println("new task 3")
            newtasks = append(newtasks, &discoverTask{})
        }
    
        // Launch a timer to wait for the next node to expire if all
        // candidates have been tried and no task is currently active.
        // This should prevent cases where the dialer logic is not ticked
        // because there are no pending events.
        if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
            t := &waitExpireTask{s.hist.min().exp.Sub(now)}
            //fmt.Println("new task 4")
            newtasks = append(newtasks, t)
        }
        return newtasks
    }
    

    newTasks创建了两种task,及diaTask跟discoverTask
    两个结构体定义如下

    // dialtask是用来对dest节点进行dial
    type dialTask struct {
        flags        connFlag
        dest         *discover.Node
        lastResolved time.Time
        resolveDelay time.Duration
    }
    
    // discoverTask runs discovery table operations.
    // Only one discoverTask is active at any time.
    // discoverTask.Do performs a random lookup.
    type discoverTask struct {
        results []*discover.Node
    }
    

    1.3 task接口的实现

    dialTask跟discoverTask都实现了接口

    type task interface {
        Do(*Server)
    }
    
    • dialtask的Do
      dialTask的Do实现如下
    func (t *dialTask) Do(srv *Server) {
        if t.dest.Incomplete() {
    #####
    resolve确保dial的目标节点是跟本地节点close的节点
    close的处理见lookup方法
    #####
            if !t.resolve(srv) {
                return
            }
        }
    ####
    开始对远端进行tcp连接
    ####
        err := t.dial(srv, t.dest)
        if err != nil {
            log.Trace("Dial error", "task", t, "err", err)
            // Try resolving the ID of static nodes if dialing failed.
            if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
                if t.resolve(srv) {
                    //fmt.Println(t.dest)
                    t.dial(srv, t.dest)
                }
            }
    
     ####
    最后的方法落在srv.SetupConn上,这部分跟被动接受连接是一样的
    而srv.SetupConn会调用srv.addpeer<-node,从而进入srv.runPeer(p)
    ####
    func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
        //fmt.Println("dialtask dia")
        fd, err := srv.Dialer.Dial(dest)
        if err != nil {
            //fmt.Println("dialtask error")
            //fmt.Println(err)
            return &dialError{err}
        }
        //fmt.Println("dialtask dia success")
        //fmt.Println(dest)
        mfd := newMeteredConn(fd, false)
        return srv.SetupConn(mfd, t.flags, dest)
    }   
    
    • discoverTask的Do
    func (t *discoverTask) Do(srv *Server) {
        // newTasks generates a lookup task whenever dynamic dials are
        // necessary. Lookups need to take some time, otherwise the
        // event loop spins too fast.
        //fmt.Println("discoverTask do")
        //debug.PrintStack()
        next := srv.lastLookup.Add(lookupInterval)
        if now := time.Now(); now.Before(next) {
            time.Sleep(next.Sub(now))
        }
        srv.lastLookup = time.Now()
        var target discover.NodeID
        rand.Read(target[:])
        t.results = srv.ntab.Lookup(target)
    }
    

    该方法的目的是

    • 随机生成一个目标节点(虚拟的)
    • 然后从k桶中找出该目标节点范围内的节点
    • 找出来的结果t.results,在生成任务(newTasks方法)时生成dialtask,开始去进行tcp连接

    2. 被动接受连接

    ethereum p2p Kademlia的实现之四提到接受连接的调用过程是

    func (srv *Server) startListening() 
    =>
    func (srv *Server) listenLoop()
    =>
    func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error 
    =>
    func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error
    =>
    func (srv *Server) checkpoint(c *conn, stage chan<- *conn) error
    =>
    func (srv *Server) run(dialstate dialer)中的
    ...
    case c := <-srv.addpeer:
    ...
    

    相关文章

      网友评论

        本文标题:ethereum p2p Kademlia的实现之五

        本文链接:https://www.haomeiwen.com/subject/ttjgkftx.html