美文网首页以太坊原理解析
[以太坊源码分析][p2p网络06]:交易广播和区块广播

[以太坊源码分析][p2p网络06]:交易广播和区块广播

作者: jea的笔记本 | 来源:发表于2019-01-20 14:55 被阅读0次

    eth/handle.go中的ProtocolManager管理节点之间通信。节点与节点之间的通信,也就是区块和交易的广播或同步。

    这里先介绍广播。提及广播,要先说一个有趣的协议:gossip,对,就是流言蜚语。如果有关于明星的八卦或是负面新闻,不用多长时间,可能满大街的人们就都知道了。广播就类似于流言蜚语的传播,一传十,十传百的扩散出去,最后整个网络都知晓了。

    以下是ProtocolManager实现区块和交易的广播的流程图:

    ProtocolManager的实现方法
    接下来会一步一步介绍。

    0.索引

    01.广播和同步的启动
    02.区块广播
    03.区块广播相关源码
    04.交易广播以及源码
    05.异步发送区块和交易的说明
    06.总结

    1.广播和同步的启动

    区块和交易的广播与同步由ProtocolManager协议管理控制。启动方法为Start

    func (pm *ProtocolManager) Start(maxPeers int) {
        pm.maxPeers = maxPeers
    
        // 广播交易
        pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
        pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
        go pm.txBroadcastLoop()
    
        // 广播区块
        pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
        go pm.minedBroadcastLoop()
    
        // 开始同步
        go pm.syncer()
        go pm.txsyncLoop()
    }
    

    开启了4个协程:

    • 1.创建了新交易事件的通道,然后开始pm.txBroadcastLoop() 广播交易。
    • 2.创建了新区块事件的通道,然后开始pm.minedBroadcastLoop() 广播区块。
    • 3.同步区块,同步的过程在eth/sync.go里,下一次介绍。
    • 4.同步交易。

    先从区块广播开始。

    2.区块广播

    区块广播指的是矿工挖出新的区块后,将新区块告知并发给p2p网络中的所有节点。这里涉及到两个广播过程:

    • 1.矿工广播新区块
    • 2.其他的中继节点广播新区快

    如下图:


    区块广播

    第一轮:
    黄色的节点表示矿工,矿工挖到区块后,接下来要将区块广播出去,也就是发送给相邻的节点,这里相邻的节点有5个,两个红色的节点和三个蓝色的节点。红色的节点表示收到区块的节点,蓝色的节点表示收到区块哈希的节点。

    这里红色的节点是有一定数量要求的。取的是,要广播的节点数量的平方根。要广播5个节点,5取平方根再取整为2个。也就是说矿工向这两个红色节点直接发送了区块,然后向剩余的节点发送了区块哈希

    第二轮:
    接收到区块哈希的蓝色节点向发来区块哈希的节点(也就是矿工)请求下载区块,下载完区块后,就跟接收到区块的红色节点一样,向它的相邻节点发送区块区块哈希,如第一轮的过程。

    这其中会有一种情况产生,如果提前接收到了未来的区块,比如说,区块A->区块B->区块C,需要的是区块B,但是接收到了区块C的情况,这时候会将区块哈希进行广播。

    第n轮:
    同第二轮,直到整个网络都知晓广播的区块。

    注意:下面的源码介绍的是以矿工挖出区块后的第一次广播,也就是进行第一轮操作。交易广播亦是如此。)

    3.区块广播相关源码

    首先是区块广播循环minedBroadcastLoop()
    func (pm *ProtocolManager) minedBroadcastLoop() {
        // 自动停止,如果退订了通道。
        for obj := range pm.minedBlockSub.Chan() {
            if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
                // 广播区块。
                pm.BroadcastBlock(ev.Block, true)  // 首先广播区块。
                pm.BroadcastBlock(ev.Block, false) // 然后只广播区块哈希。
            }
        }
    }
    

    区块广播循环minedBroadcastLoop()开启了之后,会一直读取区块事件的通道,也就是如果有新的区块事件产生,就能立即知晓。

    然后进行区块广播。调用了两次pm.BroadcastBlock方法。第一次标志位为true,给部分节点广播区块。第二次标志位为false,只广播区块哈希。

    关于pm.BroadcastBlock方法。

    根据propagate标志位的不同设置,对应不同的区块广播方式。

    func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool)
    
    • 1.先获取新的区块的哈希hash,和本地节点的相邻节点中,未知这个区块的节点列表peers
      hash := block.Hash()
      peers := pm.peers.PeersWithoutBlock(hash)
      
    • 2.如果 propagate 字段为true。广播区块,节点的数量为peers的长度的平方根。
      transferLen := int(math.Sqrt(float64(len(peers))))
      transfer := peers[:transferLen]
      for _, peer := range transfer {
        peer.AsyncSendNewBlock(block, td)
      }
      
    • 3.如果 propagate 字段为false。广播区块哈希,节点的数量为peers剩余的节点数量。(即没有收到区块的节点。)
      for _, peer := range peers {
        peer.AsyncSendNewBlockHash(block)
      }
      
    异步发送区块或区块哈希

    (代码在eth/peer.go里)
    发送区块。
    在远程节点的广播队列里加入了区块事件,如果远程节点的广播队列 queuedProps 满了,则无法收到。然后标注该远程节点已知该区块。

    func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
        select {
        case p.queuedProps <- &propEvent{block: block, td: td}:
            p.knownBlocks.Add(block.Hash())
        default:
            p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
        }
    }
    

    发送区块哈希。
    与发送区块类似。在远程节点的区块哈希通知队列里加入了区块事件。然后标注该远程节点已知该区块。

    func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
        select {
        case p.queuedAnns <- block:
            p.knownBlocks.Add(block.Hash())
        default:
            p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
        }
    }
    

    4.交易广播以及源码

    由于交易的数量比较多,所以每次广播的是一批交易。交易的广播也相对比较简单,一批新的交易,直接传给相邻节点即可。

    交易广播循环 txBroadcastLoop()

    循环读取交易事件通道,如果接收到新的一批交易,则广播出去。

    func (pm *ProtocolManager) txBroadcastLoop() {
        for {
            select {
            case event := <-pm.txsCh:
                pm.BroadcastTxs(event.Txs)
            case <-pm.txsSub.Err():
                return
            }
        }
    }
    
    pm.BroadcastTxs 方法

    广播交易,由于是一批交易,所以要先知道相邻的节点缺少这一批交易里的哪一些交易。定义了交易集合的映射,即远程节点对应该远程节点缺少的交易列表。然后发送交易。

    func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
        var txset = make(map[*peer]types.Transactions)
    
        // 广播给无该交易的节点
        for _, tx := range txs {
            peers := pm.peers.PeersWithoutTx(tx.Hash())
            for _, peer := range peers {
                txset[peer] = append(txset[peer], tx)
            }
            log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
        }
        // 发送交易
        for peer, txs := range txset {
            peer.AsyncSendTransactions(txs)
        }
    }
    
    异步发送交易

    在远程节点的交易队列里加入了交易事件,如果远程节点的交易队列 queuedTxs 满了,则无法收到。然后标注该远程节点已知该交易。(一批交易。)

    func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
        select {
        case p.queuedTxs <- txs:
            for _, tx := range txs {
                p.knownTxs.Add(tx.Hash())
            }
        default:
            p.Log().Debug("Dropping transaction propagation", "count", len(txs))
        }
    }
    

    5.异步发送区块和交易的说明

    在进行区块广播和交易广播的时候,都是采用异步发送的形式,每个远程节点都设置了三个广播的通道,queuedProps,区块通道,缓存为4个区块;queuedAnns,区块哈希通道,缓存为4个区块哈希;queuedTxs交易通道,缓存为128个交易。

    需要进行区块或交易的广播的时候,将区块或交易放入远程节点相应的通道中。

    远程节点读取通道内容

    (代码在eth/peer.go里)

    在上层网络的peerSet中加入新的远程节点,也就是Register注册节点的时候,会开一个单独的协程,启动远程节点的广播方法,即go p.broadcast()

    func (ps *peerSet) Register(p *peer) error {
        ...
        ps.peers[p.id] = p
        go p.broadcast()
        return nil
    }
    

    go p.broadcast()方法,是一个异步读取循环,每次从交易通道,或区块通道,或区块哈希通道中读取内容,然后执行对应的发送方法。

    func (p *peer) broadcast() {
        for {
            select {
            case txs := <-p.queuedTxs:
                if err := p.SendTransactions(txs); err != nil {return}
            ...
            case prop := <-p.queuedProps:
                if err := p.SendNewBlock(prop.block, prop.td); err != nil {return}
            ...
    
            case block := <-p.queuedAnns:
                if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {return}
            ...
            case <-p.term:
                return
            }
        }
    }
    

    这里以发送区块为例,先标注该远程节点已知该区块,然后调用p2p.Send方法,将区块发送给远程的节点。

    func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
        p.knownBlocks.Add(block.Hash())
        return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
    }
    

    然后是Send方法,将区块数据进行rlp编码后置入r中,size为rlp编码后的数据长度。调用w.WriteMsg方法将要发送的数据写入w通道。

    func Send(w MsgWriter, msgcode uint64, data interface{}) error {
        size, r, err := rlp.EncodeToReader(data)
        ...
        return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
    }
    

    6.总结

    • 1.区块和交易的广播是一种gossip的传播方式,每个节点都向相邻的节点传播,最后蔓延开去,整个p2p网络就都知晓了广播的消息。
    • 2.区块广播有两个内容,分别为区块和区块哈希的广播。
    • 3.交易广播的对象是一批交易。

    相关文章

      网友评论

        本文标题:[以太坊源码分析][p2p网络06]:交易广播和区块广播

        本文链接:https://www.haomeiwen.com/subject/qgufkqtx.html