美文网首页以太坊(ethereum)实现研究
ethereum p2p Kademlia的实现之四

ethereum p2p Kademlia的实现之四

作者: 古则 | 来源:发表于2018-04-18 20:35 被阅读34次

    前面讲了p2p网络的实现,现在开始涉及节点间应用数据(区块,交易等)的交互

    1.p2p模块定义的用户协议接口

    为了将p2p模块从具体的数据处理细节中解耦出来,实现p2p模块的独立性,p2p模块并没有对节点间的数据进行处理,而是定义了一个结构体,调用该结构体的Run方法来处理数据
    接口的定义:

    // p2p/protocol.go
    // Protocol represents a P2P subprotocol implementation.
    type Protocol struct {
        // Name should contain the official protocol name,
        // often a three-letter word.
        Name string
    
        // Version should contain the version number of the protocol.
        Version uint
    
        // Length should contain the number of message codes used
        // by the protocol.
        Length uint64
    
        // Run is called in a new groutine when the protocol has been
        // negotiated with a peer. It should read and write messages from
        // rw. The Payload for each message must be fully consumed.
        //
        // The peer connection is closed when Start returns. It should return
        // any protocol-level error (such as an I/O error) that is
        // encountered.
        Run func(peer *Peer, rw MsgReadWriter) error
    
        // NodeInfo is an optional helper method to retrieve protocol specific metadata
        // about the host node.
        NodeInfo func() interface{}
    
        // PeerInfo is an optional helper method to retrieve protocol specific metadata
        // about a certain peer in the network. If an info retrieval function is set,
        // but returns nil, it is assumed that the protocol handshake is still running.
        PeerInfo func(id discover.NodeID) interface{}
    }
    
    • 使用p2p模块时,调用者需要注册一个已经实现Run方法的结构体到p2p模块
    • 当节点间有数据过来时,p2p模块会调用Run方法进行数据处理

    2.eth的注册流程

    注册的调用过程是:

    //cmd/geth/main.go
    init()
    =>
    func geth(ctx *cli.Context) error ()
    =>
    //cmd/geth/config.go
    func makeFullNode(ctx *cli.Context) *node.Node
    =>
    //cmd/utils/flags.go
    func RegisterEthService(stack *node.Node, cfg *eth.Config)
    =>
    //cmd/node/node.go
    func (n *Node) Register(constructor ServiceConstructor) error
    

    具体的关键注册代码

    //cmd/utils/flags.go
    // RegisterEthService adds an Ethereum client to the stack.
    func RegisterEthService(stack *node.Node, cfg *eth.Config) {
        var err error
        if cfg.SyncMode == downloader.LightSync {
            err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
                return les.New(ctx, cfg)
            })
        } else {
            err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
    //创建eth
                fullNode, err := eth.New(ctx, cfg)
                if fullNode != nil && cfg.LightServ > 0 {
                    ls, _ := les.NewLesServer(fullNode, cfg)
                    fullNode.AddLesServer(ls)
                }
                return fullNode, err
            })
        }
        if err != nil {
            Fatalf("Failed to register the Ethereum service: %v", err)
        }
    }
    
    //eth/handler.go
        manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
        for i, version := range ProtocolVersions {
            // Skip protocol version if incompatible with the mode of operation
            if mode == downloader.FastSync && version < eth63 {
                continue
            }
            // Compatible; initialise the sub-protocol
            version := version // Closure for the run
            manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
                Name:    ProtocolName,
                Version: version,
                Length:  ProtocolLengths[i],
                Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                    peer := manager.newPeer(int(version), p, rw)
                    select {
                    case manager.newPeerCh <- peer:
                        manager.wg.Add(1)
                        defer manager.wg.Done()
                        return manager.handle(peer)
                    case <-manager.quitSync:
                        return p2p.DiscQuitting
                    }
                },
                NodeInfo: func() interface{} {
                    return manager.NodeInfo()
                },
                PeerInfo: func(id discover.NodeID) interface{} {
                    if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                        return p.Info()
                    }
                    return nil
                },
            })
        }
    

    3.数据处理的调用过程

    被注册到p2p.Server后
    在p2p.Server.Start()函数中有

    ...
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
    ...
    go srv.run(dialer)
    ...
    

    在p2p.Server.run(dialstate dialer)中

    ...
    // At this point the connection is past the protocol handshake.
                // Its capabilities are known and the remote identity is verified.
                err := srv.protoHandshakeChecks(peers, inboundCount, c)
                //fmt.Println("pre runPeer", err)
                if err == nil {
                    // The handshakes are done and it passed all checks.
                    p := newPeer(c, srv.Protocols)
                    // If message events are enabled, pass the peerFeed
                    // to the peer
                    if srv.EnableMsgEvents {
                        p.events = &srv.peerFeed
                    }
                    name := truncateName(c.name)
                    srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
                    go srv.runPeer(p)
                    peers[c.id] = p
                    if p.Inbound() {
                        inboundCount++
                    }
                }
    ...
    

    再从p2p.Server.runPeer

    peer.run

    peer.startProtocols,调用注册的run方法
    如下:

    //p2p/peer.go
    func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
        //fmt.Println("startProtocols:", p.running)
        //debug.PrintStack()
        p.wg.Add(len(p.running))
        for _, proto := range p.running {
            //fmt.Println("proto:", proto)
            proto := proto
            proto.closed = p.closed
            proto.wstart = writeStart
            proto.werr = writeErr
            var rw MsgReadWriter = proto
            if p.events != nil {
                rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
            }
            p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
            go func(){
    ##############
    //这里调用注册的Run方法
    ##############
                err := proto.Run(p, rw)
                if err == nil {
                    p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
                    err = errProtocolReturned
                } else if err != io.EOF {
                    p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
                }
                p.protoErr <- err
                p.wg.Done()
            }()
        }
    }
    

    4.tcp连接的建立

    这里有两类tcp连接

    • 主动连接,在p2p.Server.run中实现
      相关代码片段是
    // starts until max number of active tasks is satisfied
        startTasks := func(ts []task) (rest []task) {
            i := 0
            for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
                t := ts[i]
                srv.log.Trace("New dial task", "task", t)
                go func() { t.Do(srv); taskdone <- t }()
                runningTasks = append(runningTasks, t)
            }
            return ts[i:]
        }
    =>
    //p2p/dial.go
    func (t *dialTask) Do(srv *Server) {
        //fmt.Println("diatask do")
        //debug.PrintStack()
        if t.dest.Incomplete() {
            if !t.resolve(srv) {
                return
            }
        }
        //fmt.Println(t.dest)
        err := t.dial(srv, t.dest)
        if err != nil {
            log.Trace("Dial error", "task", t, "err", err)
            // Try resolving the ID of static nodes if dialing failed.
            if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
                if t.resolve(srv) {
                    //fmt.Println(t.dest)
                    t.dial(srv, t.dest)
                }
            }
        }
    }
    =>
    // dial performs the actual connection attempt.
    func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
        //fmt.Println("dialtask dia")
        fd, err := srv.Dialer.Dial(dest)
        if err != nil {
            //fmt.Println("dialtask error")
            //fmt.Println(err)
            return &dialError{err}
        }
        //fmt.Println("dialtask dia success")
        //fmt.Println(dest)
        mfd := newMeteredConn(fd, false)
        return srv.SetupConn(mfd, t.flags, dest)
    }
    
    • 另外一个监听tcp端口,接收对方连接
    func (srv *Server) startListening() error {
        // Launch the TCP listener.
        listener, err := net.Listen("tcp", srv.ListenAddr)
        if err != nil {
            return err
        }
        laddr := listener.Addr().(*net.TCPAddr)
        srv.ListenAddr = laddr.String()
        srv.listener = listener
        srv.loopWG.Add(1)
        go srv.listenLoop()
        // Map the TCP listening port if NAT is configured.
        if !laddr.IP.IsLoopback() && srv.NAT != nil {
            srv.loopWG.Add(1)
            go func() {
                nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
                srv.loopWG.Done()
            }()
        }
        return nil
    }
    =>
    // listenLoop runs in its own goroutine and accepts
    // inbound connections.
    func (srv *Server) listenLoop()
    

    相关文章

      网友评论

        本文标题:ethereum p2p Kademlia的实现之四

        本文链接:https://www.haomeiwen.com/subject/ptybkftx.html