美文网首页以太坊原理解析
[以太坊源码分析][p2p网络05]:底层节点如何与上层节点联系

[以太坊源码分析][p2p网络05]:底层节点如何与上层节点联系

作者: jea的笔记本 | 来源:发表于2019-01-20 14:49 被阅读0次

    对于以太坊的p2p网络,我觉得,分为底层p2p网络的构建,以及上层eth服务的实现。在介绍以太坊上层服务之前,需要先来看一下底层网络是怎么跟上层网络联系在一起的。

    0.索引

    01.ProtocolManager 协议管理
    02.新建一个 ProtocolManager
    03.建立联系
    04.使用 Run 方法
    05.底层的peer结构
    06.总结

    1.ProtocolManager 协议管理

    ProtocolManagereth/handle.go中的核心结构体,用来管理节点之间的通信。

    type ProtocolManager struct {
        networkID uint64
        fastSync  uint32 
        acceptTxs uint32 
    
        txpool      txPool
        blockchain  *core.BlockChain
        chainconfig *params.ChainConfig
        maxPeers    int
    
        downloader *downloader.Downloader
        fetcher    *fetcher.Fetcher
        peers      *peerSet
    
        SubProtocols []p2p.Protocol
    
        eventMux      *event.TypeMux
        txsCh         chan core.NewTxsEvent
        txsSub        event.Subscription
        minedBlockSub *event.TypeMuxSubscription
    
        // fetcher, syncer, txsyncLoop 的通道
        newPeerCh   chan *peer
        txsyncCh    chan *txsync
        quitSync    chan struct{}
        noMorePeers chan struct{}
    
        wg sync.WaitGroup
    }
    

    ProtocolManager结构体中包含了

    • networkID网络id。
    • fastSync快速同步的标志, acceptTxs接收交易方式的标志。
    • txpool交易池,blockchain区块链,chainconfig区块链配置,maxpeers最大节点数。
    • downloader下载器,fetcher提取器,peers相邻节点表。
    • SubProtocols子协议列表。(与底层节点相关。)
    • 以及需要用到的各种通道和同步锁。

    2.新建一个 ProtocolManager

    底层节点与上层节点的联系,就在新建一个ProtocolManager的方法里。(第3个步骤,再往下会做具体说明。)

    func NewProtocolManager(
        config *params.ChainConfig, 
        mode downloader.SyncMode, 
        networkID uint64, 
        mux *event.TypeMux, 
        txpool txPool,
        engine consensus.Engine, 
        blockchain *core.BlockChain, 
        chaindb ethdb.Database
    ) (*ProtocolManager, error) 
    
    • 1.初始化基础字段。
      manager := &ProtocolManager{networkID:   networkID,
        eventMux:    mux,
        txpool:      txpool,
        blockchain:  blockchain,
        chainconfig: config,
        peers:       newPeerSet(),
        newPeerCh:   make(chan *peer),
        noMorePeers: make(chan struct{}),
        txsyncCh:    make(chan *txsync),
        quitSync:    make(chan struct{}),}
      
    • 2.确认是否快速同步模式。
      if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {...}
      
    • 3.每个实现的版本添加子协议。也就是上层服务给予底层p2p网络调用上层服务的函数入口。(在这一步建立的联系。)
      manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
        for i, version := range ProtocolVersions {...}
      
    • 4.新建一个下载器downloader,用于下载区块。构建不同的同步机制,mode为同步机制类型。
      manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
      
    • 5.validator引用验证区块头的方法。
      validator := func(header *types.Header) error {
            return engine.VerifyHeader(blockchain, header, true)
        }
      
    • 6.heighter引用获取区块高度的方法。
      heighter := func() uint64 {
            return blockchain.CurrentBlock().NumberU64()
        }
      
    • 7.inserter引用在区块链上插入区块的方法。
      inserter := func(blocks types.Blocks) (int, error) {
            ...
            return manager.blockchain.InsertChain(blocks)
        }
      
    • 8.新建一个提取器fetcher,用于辅助同步区块。
      manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
      

    3.(添加子协议)建立联系

    添加子协议的具体步骤。

    manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
        Name:    ProtocolName,
        Version: version,
        Length:  ProtocolLengths[i],
        Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
            peer := manager.newPeer(int(version), p, rw)
            select {
            case manager.newPeerCh <- peer:
                manager.wg.Add(1)
                defer manager.wg.Done()
                return manager.handle(peer)
            case <-manager.quitSync:
                return p2p.DiscQuitting
            }
        },
        NodeInfo: func() interface{} {
            return manager.NodeInfo()
        },
        PeerInfo: func(id enode.ID) interface{} {
            if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                return p.Info()
            }
            return nil
        },
    })
    

    在新建一个p2p.Protocol对象的时候,

    • 传入三个字段:Name协议名,Version协议版本,Length协议长度。
    • 新建三个远程节点的回调函数:
      • Run执行协议。
      • NodeInfo返回了本地节点的网络id,区块难度值,创世区块哈希值,区块链配置,当前区块哈希值。
      • PeerInfo远程节点的信息。

    Run方法manager.newPeer方法使得底层的peer能创建一个上层的peer,并且自身包含在上层的peer里。创建了上层的peer后,调用了manager.handle(peer)方法开始处理远程节点发来的消息。

    4.调用子协议的 Run 方法 (包含步骤5)

    在发起TCP连接请求的那一篇里,提到了:
    在节点协议握手成功之后,srv.addpeer的通道中加入与远程节点的连接。
    这时候会触发 case c := <-srv.addpeer:
    (代码在p2p/server.go中)

    case c := <-srv.addpeer:
        // 对协议握手进行一次检查
        err := srv.protoHandshakeChecks(peers, inboundCount, c)
        if err == nil {
            // 协议握手完成,通过检查。
            // 新建底层peer。
            p := newPeer(c, srv.Protocols)
            // 如果启用了消息事件,请将peerFeed传递给peer
            if srv.EnableMsgEvents {
                p.events = &srv.peerFeed
            }
            name := truncateName(c.name)
            srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
                // 启动一个单独的协程,运行节点。
                go srv.runPeer(p)
                // 接收请求的节点集合加入该节点。
                peers[c.node.ID()] = p
                if p.Inbound() {
                    // 接入连接的数量加1.
                    inboundCount++
                }
          }
    
    • 1.先对协议握手进行检查。
    • 2.协议握手检查通过后,新建一个底层的peer
    • 3.启动一个单独的协程,来执行这一个peergo srv.runPeer(p)。先进行节点添加这一事件的广播,然后调用p2p/peer.gopeer对象的run方法。
      func (srv *Server) runPeer(p *Peer) {
        // 测试。
        ...
        // 广播节点添加事件。
        srv.peerFeed.Send(&PeerEvent{
            Type: PeerEventTypeAdd,
            Peer: p.ID(),
        })
        // 执行协议·
        remoteRequested, err := p.run()
        // 广播节点下线
        srv.peerFeed.Send(&PeerEvent{
            Type:  PeerEventTypeDrop,
            Peer:  p.ID(),
            Error: err.Error(),
        })
        // 删除节点
        srv.delpeer <- peerDrop{p, err, remoteRequested}
      }
      
    • 4.在peers里加入该peer,如果接入成功,接入数量加1。

    5.底层的peer结构以及run方法

    (在p2p/peer.go中)
    首先是底层peer结构体。它代表一个远程节点的连接。包含了rw建立的TCP连接,running协议对应的读写通道。

    type Peer struct {
        rw      *conn                 // 建立的TCP连接
        running map[string]*protoRW   // 协议对应的读写通道
        log     log.Logger            // 日志记录
        created mclock.AbsTime
    
        wg       sync.WaitGroup
        protoErr chan error
        closed   chan struct{}
        disc     chan DiscReason
    
        // 接收消息发送/接收事件
        events *event.Feed
    }
    

    然后是底层peerrun方法,也就是一个底层节点会执行的所有操作。

    func (p *Peer) run() (remoteRequested bool, err error) {
        // 定义变量。
        ...
        // 启动了两个单独的协程,一个用于循环的读取消息,一个用于循环发送ping消息,确保对方节点在线。
        p.wg.Add(2)
        go p.readLoop(readErr)
        go p.pingLoop()
    
        // 开启所有协议。
        writeStart <- struct{}{}
        p.startProtocols(writeStart, writeErr)
    
        // 等待接收到错误或者是断开连接。
    loop:
        ...
        // 关闭节点。
        close(p.closed)
        p.rw.close(reason)
        p.wg.Wait()
        return remoteRequested, err
    }
    
    
      1. 启动了两个单独的协程,go p.readLoop(readErr)用于循环的读取消息,go p.pingLoop()用于循环发送ping消息,确保对方节点在线。
    • 2.执行节点包含的所有协议,即p.startProtocols(writeStart, writeErr)方法。在p.startProtocols方法中调用了底层网络设置的回调函数Run(以下程序语句)。
      err := proto.Run(p, rw)
      

    步骤4和步骤5一步一步的执行和调用,最后使用了p2p.ProtocolRun方法。

    6.总结

    • 1.底层peer与上层peer通过p2p.Protocol对象的Run方法联系在一起。Run实现了新建上层节点,以及与协议版本对应的处理节点之间通信的消息的功能。
    • 2.关于底层p2p网络启动了的协程的总结:
      • Server服务启动了两个协程:监听远程节点发来的TCP请求,发起TCP连接请求。
      • 底层peer运行协议的时候,启动了n+2个协程:循环读取消息,循环发送ping消息,以及n个协议对应的处理方式的协程。

    相关文章

      网友评论

        本文标题:[以太坊源码分析][p2p网络05]:底层节点如何与上层节点联系

        本文链接:https://www.haomeiwen.com/subject/rugxkqtx.html