ethereum p2p Kademlia的实现之一

作者: 古则 | 来源:发表于2018-04-16 19:57 被阅读32次

    p2p模块的入口在p2p.Server.Start()中
    具体的调用过程见
    geth的启动之整体及p2p服务的启动
    server的数据结构如下

    // Server manages all peer connections.
    type Server struct {
        // Config fields may not be modified while the server is running.
        Config
    
        // Hooks for testing. These are useful because we can inhibit
        // the whole protocol stack.
        newTransport func(net.Conn) transport
        newPeerHook  func(*Peer)
    
        lock    sync.Mutex // protects running
        running bool
    //使用udp端口,用于维护p2p网络
        ntab         discoverTable
    //使用tcp端口,用于数据交换
        listener     net.Listener
        ourHandshake *protoHandshake
        lastLookup   time.Time
        DiscV5       *discv5.Network
    
        // These are for Peers, PeerCount (and nothing else).
        peerOp     chan peerOpFunc
        peerOpDone chan struct{}
    
        quit          chan struct{}
        addstatic     chan *discover.Node
        removestatic  chan *discover.Node
        posthandshake chan *conn
        addpeer       chan *conn
        delpeer       chan peerDrop
        loopWG        sync.WaitGroup // loop, listenLoop
        peerFeed      event.Feed
        log           log.Logger
    }
    

    该方法的关键代码如下

    // Start starts running the server.
    // Servers can not be re-used after stopping.
    func (srv *Server) Start() (err error) {
        //k桶的生成
         // node table
        if !srv.NoDiscovery {
            cfg := discover.Config{
                PrivateKey:   srv.PrivateKey,
                AnnounceAddr: realaddr,
                NodeDBPath:   srv.NodeDatabase,
                NetRestrict:  srv.NetRestrict,
                Bootnodes:    srv.BootstrapNodes,
                Unhandled:    unhandled,
            }
                 //k桶
            ntab, err := discover.ListenUDP(conn, cfg)
            if err != nil {
                return err
            }
            srv.ntab = ntab
        }
    }
    //dialerstate的创建
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
    
    //server启动
    srv.run(dialer)
    

    1.端口

    p2p要同时在udp和tcp的30303端口和p2p网络进行数据交换,前者是用来维护P2p网络,后者则是各种应用协议真正交换数据的地方

    2.dial.go文件分析

    先看dial.go文件,该文件的最上层的一个数据结构是

    // dialstate schedules dials and discovery lookups.
    // it get's a chance to compute new tasks on every iteration
    // of the main loop in Server.run.
    type dialstate struct {
        maxDynDials int
        ntab        discoverTable
        netrestrict *netutil.Netlist
    
        lookupRunning bool
        dialing       map[discover.NodeID]connFlag
        lookupBuf     []*discover.Node // current discovery lookup results
        randomNodes   []*discover.Node // filled from Table
        static        map[discover.NodeID]*dialTask
        hist          *dialHistory
    
        start     time.Time        // time when the dialer was first used
        bootnodes []*discover.Node // default dials when there are no peers
    }
    

    该struct是对discovery跟dials任务进行组织跟管理
    而dialTask,discoverTask实现了task接口

    type task interface {
        Do(*Server)
    }
    
    // A dialTask is generated for each node that is dialed. Its
    // fields cannot be accessed while the task is running.
    type dialTask struct {
        flags        connFlag
        dest         *discover.Node
        lastResolved time.Time
        resolveDelay time.Duration
    }
    
    // discoverTask runs discovery table operations.
    // Only one discoverTask is active at any time.
    // discoverTask.Do performs a random lookup.
    type discoverTask struct {
        results []*discover.Node
    }
    

    两个struct对task接口的实现为:

    func (t *dialTask) Do(srv *Server) {
        //fmt.Println("diatask do")
        //debug.PrintStack()
        if t.dest.Incomplete() {
            if !t.resolve(srv) {
                return
            }
        }
        //fmt.Println(t.dest)
        err := t.dial(srv, t.dest)
        if err != nil {
            log.Trace("Dial error", "task", t, "err", err)
            // Try resolving the ID of static nodes if dialing failed.
            if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
                if t.resolve(srv) {
                    //fmt.Println(t.dest)
                    t.dial(srv, t.dest)
                }
            }
        }
    }
    
    func (t *discoverTask) Do(srv *Server) {
        // newTasks generates a lookup task whenever dynamic dials are
        // necessary. Lookups need to take some time, otherwise the
        // event loop spins too fast.
        //fmt.Println("discoverTask do")
        //debug.PrintStack()
        next := srv.lastLookup.Add(lookupInterval)
        if now := time.Now(); now.Before(next) {
            time.Sleep(next.Sub(now))
        }
        srv.lastLookup = time.Now()
        var target discover.NodeID
        rand.Read(target[:])
        t.results = srv.ntab.Lookup(target)
    }
    

    可见dialTask主要为对dest的连接等处理,而discoverTask主要为对ntab(discoverTable)的lookup

    ntab在p2p.Server.Start()中生成,为Server的一个成员,dialstate中的ntab是对Server中ntab的引用

    3.ntab的创建

    ntab创建入口在p2p.Server.Start()

    ntab, err := discover.ListenUDP(conn, cfg)
    if err != nil {
        return err
    }
    srv.ntab = ntab
    

    来看具体的创建过程

    //p2p/discover/udp.go
    // ListenUDP returns a new table that listens for UDP packets on laddr.
    func ListenUDP(c conn, cfg Config) (*Table, error) {
        tab, _, err := newUDP(c, cfg)
        if err != nil {
            return nil, err
        }
        log.Info("UDP listener up", "self", tab.self)
        return tab, nil
    }
    
    func newUDP(c conn, cfg Config) (*Table, *udp, error) {
        udp := &udp{
            conn:        c,
            priv:        cfg.PrivateKey,
            netrestrict: cfg.NetRestrict,
            closing:     make(chan struct{}),
            gotreply:    make(chan reply),
            addpending:  make(chan *pending),
        }
        realaddr := c.LocalAddr().(*net.UDPAddr)
        if cfg.AnnounceAddr != nil {
            realaddr = cfg.AnnounceAddr
        }
        // TODO: separate TCP port
        udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
        tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
        if err != nil {
            return nil, nil, err
        }
        udp.Table = tab
    
        go udp.loop()
        go udp.readLoop(cfg.Unhandled)
        return udp.Table, udp, nil
    }
    

    newTable的实现为于p2p/discover/table.go
    其中的type Table struct实现了discoverTable接口

    // p2p/dial.go
    type discoverTable interface {
        Self() *discover.Node
        Close()
        Resolve(target discover.NodeID) *discover.Node
        Lookup(target discover.NodeID) []*discover.Node
        ReadRandomNodes([]*discover.Node) int
    }
    
    // p2p/discover/table.go
    type Table struct {
        mutex   sync.Mutex        // protects buckets, bucket content, nursery, rand
        buckets [nBuckets]*bucket // index of known nodes by distance
        nursery []*Node           // bootstrap nodes
        rand    *mrand.Rand       // source of randomness, periodically reseeded
        ips     netutil.DistinctNetSet
    
        db         *nodeDB // database of known nodes
        refreshReq chan chan struct{}
        initDone   chan struct{}
        closeReq   chan struct{}
        closed     chan struct{}
    
        bondmu    sync.Mutex
        bonding   map[NodeID]*bondproc
        bondslots chan struct{} // limits total number of active bonding processes
    
        nodeAddedHook func(*Node) // for testing
    
        net  transport
        self *Node // metadata of the local node
    }
    

    相关文章

      网友评论

        本文标题:ethereum p2p Kademlia的实现之一

        本文链接:https://www.haomeiwen.com/subject/jktykftx.html