美文网首页以太坊(ethereum)实现研究
ethereum p2p Kademlia的实现之七

ethereum p2p Kademlia的实现之七

作者: 古则 | 来源:发表于2018-04-20 19:20 被阅读19次

    前面的该系列文章中已经分析了网络的建立及维护,包括:

    • 使用udp维护Kademlia网络
    • 使用tcp完成数据通信
      主要逻辑位于:
    eth/handler.go 区块链相关通信
    p2p/peer.go  p2p/rlpx.go tcp连接维护的相关通信
    

    可知tcp数据协议可以分为两层

    • p2p模块内的部分,用来完成握手(不是tcp的三次握手),授权等操作
    • eth模块的部分,这部分的协议用来同步区块,交易等数据,当然这部分协议还是需要同步p2p模块建立的tcp连接来发送

    tcp连接的建立在ethereum p2p Kademlia的实现之五已经有过分析,在此不再赘述

    1.tcp通信的调用过程

    先给出结论:

    • 通信使用的fd来自两种tcp连接建立时的fd,在server.SetupConn方法中设置,调用newRLPX传入rlpx.go中
    • rlpxFrameRW(在doEncHandshake方法中创建)实现了message.go中定义的MsgReadWriter接口,tcp通信的数据需要在这里进行rlp编解码后进行读写
    • 握手过程直接进入rlpx.go,eth模块读写数据时先在protoRW(p2p/peer.go),后进入rlpx.go
    • message.go定义了通信的数据格式

    1.1 握手过程

    先看SetupConn方法

    // as a peer. It returns when the connection has been added as a peer
    // or the handshakes have failed.
    func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
        self := srv.Self()
        if self == nil {
            return errors.New("shutdown")
        }
    #####
    newTransport为newRLPX,在server.Start中定义
    #####
        c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
        err := srv.setupConn(c, flags, dialDest)
        if err != nil {
            c.close(err)
            srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
        }
        return err
    }
    
    func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
        // Prevent leftover pending conns from entering the handshake.
        srv.lock.Lock()
        running := srv.running
        srv.lock.Unlock()
        if !running {
            return errServerStopped
        }
        // Run the encryption handshake.
        var err error
    #####
    调用rlpx的doEncHandshake方法
    #####
        if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
            srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
            //fmt.Println("Failed RLPx handshake", c.fd.RemoteAddr(), err)
            return err
        }
        clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
        // For dialed connections, check that the remote public key matches.
        if dialDest != nil && c.id != dialDest.ID {
            clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
            //fmt.Println("setupConn 1", err)
            return DiscUnexpectedIdentity
        }
    
        err = srv.checkpoint(c, srv.posthandshake)
        if err != nil {
            clog.Trace("Rejected peer before protocol handshake", "err", err)
            //fmt.Println("setupConn 2", err)
            return err
        }
    #####
    调用rlpx的doProtoHandshake,这是doProtoHandshake协议层的握手,后面再进行分析
    #####
        // Run the protocol handshake
        phs, err := c.doProtoHandshake(srv.ourHandshake)
        if err != nil {
            clog.Trace("Failed proto handshake", "err", err)
            //fmt.Println("setupConn ee 3", err)
            return err
        }
        if phs.ID != c.id {
            clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
            //fmt.Println("setupConn 4", err)
            return DiscUnexpectedIdentity
        }
        c.caps, c.name = phs.Caps, phs.Name
        err = srv.checkpoint(c, srv.addpeer)
        if err != nil {
            clog.Trace("Rejected peer", "err", err)
            //fmt.Println("setupConn 5", err)
            return err
        }
        // If the checks completed successfully, runPeer has now been
        // launched by run.
        clog.Trace("connection set up", "inbound", dialDest == nil)
        return nil
    }
    

    1.2 tcp数据的读

    入口位于server.run中

    p := newPeer(c, srv.Protocols)
    go srv.runPeer(p)
    

    最终在

    func (p *Peer) run() (remoteRequested bool, err error) {
    #####
    循环读取数据,放入rw.in中,供外部从protoRW中读取
    #####
        go p.readLoop(readErr)
    #####
    单独进行ping pong数据包的读跟写
    #####
        go p.pingLoop()
    #####
    调用eth模块注册的方法,对rw.in中的数据进行处理
    #####
        p.startProtocols(writeStart, writeErr)
    }
    

    *** readLoop中实现了对数据的rlp解码 ***
    readloop的实现如下

    func (p *Peer) readLoop(errc chan<- error) {
        defer p.wg.Done()
        for {
    #####
    调用protoRW的readMsg再rlpx.go对rlp进行解码
    #####
            msg, err := p.rw.ReadMsg()
            if err != nil {
                errc <- err
                return
            }
            msg.ReceivedAt = time.Now()
            if err = p.handle(msg); err != nil {
                errc <- err
                return
            }
        }
    }
    

    在eth模块中注册的方法会调用如下方法,对rw.in中的数据进行读入

    func (rw *protoRW) ReadMsg() (Msg, error) {
        select {
        case msg := <-rw.in:
            //fmt.Println("ReadMsg readmsg", msg.Code, rw.offset)
            //debug.PrintStack()
            msg.Code -= rw.offset
            return msg, nil
        case <-rw.closed:
            return Msg{}, io.EOF
        }
    }
    

    1.3 tcp数据的写

    写数据的入口位于eth/peer.go中,该文件中很多方法调用p2p.Send
    该方法的代码如下:

    func Send(w MsgWriter, msgcode uint64, data interface{}) error {
        size, r, err := rlp.EncodeToReader(data)
        if err != nil {
            return err
        }
    ######
    w为protoRW,在newPeer=>matchProtocols中创建
    在startProtocols中传入eth模块
    eth模块写tcp数据时又传送回来
    ######
        return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
    }
    

    protoRW的读写方法如下

    func (rw *protoRW) WriteMsg(msg Msg) (err error) {
        if msg.Code >= rw.Length {
            return newPeerError(errInvalidMsgCode, "not handled")
        }
        msg.Code += rw.offset
        select {
        case <-rw.wstart:
            err = rw.w.WriteMsg(msg)
            // Report write status back to Peer.run. It will initiate
            // shutdown if the error is non-nil and unblock the next write
            // otherwise. The calling protocol code should exit for errors
            // as well but we don't want to rely on that.
            rw.werr <- err
        case <-rw.closed:
            err = fmt.Errorf("shutting down")
        }
        return err
    }
    
    func (rw *protoRW) ReadMsg() (Msg, error) {
        select {
        case msg := <-rw.in:
            //fmt.Println("ReadMsg readmsg", msg.Code, rw.offset)
            //debug.PrintStack()
            msg.Code -= rw.offset
            return msg, nil
        case <-rw.closed:
            return Msg{}, io.EOF
        }
    }
    

    这两个方法主要进行了对msg.Code的处理
    最后仍是进入了rlpx.go中的读写方法进行处理

    2.消息格式

    2.1 整体格式

    tcp的消息格式定义在message.go中

    type Msg struct {
        Code       uint64
        Size       uint32 // size of the paylod
        Payload    io.Reader
        ReceivedAt time.Time
    }
    

    code的为消息码

    • p2p层的消息码如下
    //p2p/peer.go
    const (
        // devp2p message codes
        handshakeMsg = 0x00
        discMsg      = 0x01
        pingMsg      = 0x02
        pongMsg      = 0x03
        getPeersMsg  = 0x04
        peersMsg     = 0x05
    )
    
    • 数据层的消息码如下
    //eth/protocol.go
    // eth protocol message codes
    const (
        // Protocol messages belonging to eth/62
        StatusMsg          = 0x00
        NewBlockHashesMsg  = 0x01
        TxMsg              = 0x02
        GetBlockHeadersMsg = 0x03
        BlockHeadersMsg    = 0x04
        GetBlockBodiesMsg  = 0x05
        BlockBodiesMsg     = 0x06
        NewBlockMsg        = 0x07
    
        // Protocol messages belonging to eth/63
        GetNodeDataMsg = 0x0d
        NodeDataMsg    = 0x0e
        GetReceiptsMsg = 0x0f
        ReceiptsMsg    = 0x10
    )
    

    2.2 不同消息的不同消息体

    不同的消息码对应的Payload的数据结构不同

    • ping pong Payload为空
    • handshake消息
    //p2p/peer.go
    // protoHandshake is the RLP structure of the protocol handshake.
    type protoHandshake struct {
        Version    uint64
        Name       string
        Caps       []Cap
        ListenPort uint64
        ID         discover.NodeID
    
        // Ignore additional fields (for forward compatibility).
        Rest []rlp.RawValue `rlp:"tail"`
    }
    
    • 数据层的消息体格式位于eth/protocol.go,如下
    //eth/protocol.go
    type statusData struct {
        ProtocolVersion uint32
        NetworkId       uint64
        TD              *big.Int
        CurrentBlock    common.Hash
        GenesisBlock    common.Hash
    }
    
    // newBlockHashesData is the network packet for the block announcements.
    type newBlockHashesData []struct {
        Hash   common.Hash // Hash of one particular block being announced
        Number uint64      // Number of one particular block being announced
    }
    
    // getBlockHeadersData represents a block header query.
    type getBlockHeadersData struct {
        Origin  hashOrNumber // Block from which to retrieve headers
        Amount  uint64       // Maximum number of headers to retrieve
        Skip    uint64       // Blocks to skip between consecutive headers
        Reverse bool         // Query direction (false = rising towards latest, true = falling towards genesis)
    }
    
    // hashOrNumber is a combined field for specifying an origin block.
    type hashOrNumber struct {
        Hash   common.Hash // Block hash from which to retrieve headers (excludes Number)
        Number uint64      // Block hash from which to retrieve headers (excludes Hash)
    }
    // newBlockData is the network packet for the block propagation message.
    type newBlockData struct {
        Block *types.Block
        TD    *big.Int
    }
    
    // blockBody represents the data content of a single block.
    type blockBody struct {
        Transactions []*types.Transaction // Transactions contained within a block
        Uncles       []*types.Header      // Uncles contained within a block
    }
    
    // blockBodiesData is the network packet for block content distribution.
    type blockBodiesData []*blockBody
    

    3 消息的编解码

    消息编解码的实现位于p2p/rlpx.go中

    相关文章

      网友评论

        本文标题:ethereum p2p Kademlia的实现之七

        本文链接:https://www.haomeiwen.com/subject/tvmzkftx.html