美文网首页
以太坊源码深入分析(6)-- 以太坊P2P协议接收广播的处理和F

以太坊源码深入分析(6)-- 以太坊P2P协议接收广播的处理和F

作者: 老鱼游啊游 | 来源:发表于2018-04-26 19:50 被阅读0次

    上一节主要讲了Ethereum服务和以太坊P2P协议通讯模块ProtocolManager的初始化和启动,以及以太坊通讯协议如何广播给其他的网络节点。
    这一节讲讲,以太坊通讯协议如何处理接收到的广播消息。以及fetcher怎么工作。

    一,ProtocolManager接收网络节点广播消息
    首先看看p2p.Protocol的结构

    type Protocol struct {
        Name string
        Version uint
        Length uint64
    
        Run func(peer *Peer, rw MsgReadWriter) error
    
        NodeInfo func() interface{}
    
        PeerInfo func(id discover.NodeID) interface{}
    }
    

    上一节ProtocolManager初始化的时候会实例化一个p2p.Protocol,并实
    现了Protocol里面的三个成员变量和三个成员函数指针。

     manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
                Name:    ProtocolName,
                Version: version,
                Length:  ProtocolLengths[i],
                Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                    peer := manager.newPeer(int(version), p, rw)
                    select {
                    case manager.newPeerCh <- peer:
                        manager.wg.Add(1)
                        defer manager.wg.Done()
                        return manager.handle(peer)
                    case <-manager.quitSync:
                        return p2p.DiscQuitting
                    }
                },
                NodeInfo: func() interface{} {
                    return manager.NodeInfo()
                },
                PeerInfo: func(id discover.NodeID) interface{} {
                    if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                        return p.Info()
                    }
                    return nil
                },
            })
    

    这三个函数指针实质是注入p2p server的回调,用于处理网络中其他节点的广播通知、获取本以太坊Node 的info、本地节点的info。
    Run方法尤其重要,当发现网络中新的p2p节点时候就会执行Run方法,这时候会执行manager.handle(peer)。

    func (pm *ProtocolManager) handle(p *peer) error {
        if pm.peers.Len() >= pm.maxPeers {
            return p2p.DiscTooManyPeers
        }
        p.Log().Debug("Ethereum peer connected", "name", p.Name())
    
        // Execute the Ethereum handshake
        var (
            genesis = pm.blockchain.Genesis()
            head    = pm.blockchain.CurrentHeader()
            hash    = head.Hash()
            number  = head.Number.Uint64()
            td      = pm.blockchain.GetTd(hash, number)
        )
        if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil {
            p.Log().Debug("Ethereum handshake failed", "err", err)
            return err
        }
        if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
            rw.Init(p.version)
        }
        // Register the peer locally
        if err := pm.peers.Register(p); err != nil {
            p.Log().Error("Ethereum peer registration failed", "err", err)
            return err
        }
        defer pm.removePeer(p.id)
    
        // Register the peer in the downloader. If the downloader considers it banned, we disconnect
        if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
            return err
        }
        // Propagate existing transactions. new transactions appearing
        // after this will be sent via broadcasts.
        pm.syncTransactions(p)
    
        // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
        if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
            // Request the peer's DAO fork header for extra-data validation
            if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
                return err
            }
            // Start a timer to disconnect if the peer doesn't reply in time
            p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
                p.Log().Debug("Timed out DAO fork-check, dropping")
                pm.removePeer(p.id)
            })
            // Make sure it's cleaned up if the peer dies off
            defer func() {
                if p.forkDrop != nil {
                    p.forkDrop.Stop()
                    p.forkDrop = nil
                }
            }()
        }
        // main loop. handle incoming messages.
        for {
            if err := pm.handleMsg(p); err != nil {
                p.Log().Debug("Ethereum message handling failed", "err", err)
                return err
            }
        }
    }
    

    首先根远程网络节点握手Handshake()方法

    func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error {
        // Send out own handshake in a new thread
        errc := make(chan error, 2)
        var status statusData // safe to read after two values have been received from errc
    
        go func() {
            errc <- p2p.Send(p.rw, StatusMsg, &statusData{
                ProtocolVersion: uint32(p.version),
                NetworkId:       network,
                TD:              td,
                CurrentBlock:    head,
                GenesisBlock:    genesis,
            })
        }()
        go func() {
            errc <- p.readStatus(network, &status, genesis)
        }()
        timeout := time.NewTimer(handshakeTimeout)
        defer timeout.Stop()
        for i := 0; i < 2; i++ {
            select {
            case err := <-errc:
                if err != nil {
                    return err
                }
            case <-timeout.C:
                return p2p.DiscReadTimeout
            }
        }
        p.td, p.head = status.TD, status.CurrentBlock
        return nil
    }
    

    把本地节点的状态发送给远程节点包括ProtocolVersion、NetworkId、TD、CurrentBlock、GenesisBlock,然后读取返回的状态数据,并做对比,如果都满足条件就握手成功。

    然后将这个网络节点加入到缓存的节点列表中pm.peers.Register(p)。
    把本地的产生的未打包的交易发送给网络节点。
    验证Dao 硬分叉,如果超时则从缓存节点列表中删除这个网络节点。
    最后进入pm.handleMsg(p)主循环,不停的监听网络节点发过来的消息,并处理。
    目前以太坊节点可以接受如下网络消息:

    const (
        // Protocol messages belonging to eth/62
        StatusMsg          = 0x00
        NewBlockHashesMsg  = 0x01
        TxMsg              = 0x02
        GetBlockHeadersMsg = 0x03
        BlockHeadersMsg    = 0x04
        GetBlockBodiesMsg  = 0x05
        BlockBodiesMsg     = 0x06
        NewBlockMsg        = 0x07
    
        // Protocol messages belonging to eth/63
        GetNodeDataMsg = 0x0d
        NodeDataMsg    = 0x0e
        GetReceiptsMsg = 0x0f
        ReceiptsMsg    = 0x10
    )
    

    pm.handleMsg(p)很长,不一一分析。纯粹是每个消息一个case处理。写这个模块的作者是不是可以考虑重构一下代码,这么核心的代码模块,可读性和可扩展性都太差了。

    二,Fetcher分析,之Notify()
    fetcher是用来辅助同步区块数据的,记录各个区块头和区块体的同步状态,但它并不做真正下载区块数据的事情,下载的事情交由downloader来做。那fetcher具体是怎么工作的呢?
    我们先看看pm.handleMsg 在收到 NewBlockHashesMsg广播通知的处理代码:

    case msg.Code == NewBlockHashesMsg:
            var announces newBlockHashesData
            if err := msg.Decode(&announces); err != nil {
                return errResp(ErrDecode, "%v: %v", msg, err)
            }
            // Mark the hashes as present at the remote node
            for _, block := range announces {
                p.MarkBlock(block.Hash)
            }
            // Schedule all the unknown hashes for retrieval
            unknown := make(newBlockHashesData, 0, len(announces))
            for _, block := range announces {
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
                    unknown = append(unknown, block)
                }
            }
            for _, block := range unknown {
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
            }
    

    从广播通知里会获取到一个newBlockHashesData的列表。newBlockHashesData只包括block的hash值和block的number值。
    然后每个newBlockHashesData调用pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)方法,除了传入block的hash值和block的number值,还需要传入当前的时间戳,peer.go的两个函数指针。

    func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
        headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
        block := &announce{
            hash:        hash,
            number:      number,
            time:        time,
            origin:      peer,
            fetchHeader: headerFetcher,
            fetchBodies: bodyFetcher,
        }
        select {
        case f.notify <- block:
            return nil
        case <-f.quit:
            return errTerminated
        }
    }
    

    Notify()方法把传进来的参数拼成一个announce对象,然后send给f.notify。fetcher的loop()主回路里f.notify receive 到这个notification, 进行处理。

    case notification := <-f.notify:
                // A block was announced, make sure the peer isn't DOSing us
                propAnnounceInMeter.Mark(1)
    
                count := f.announces[notification.origin] + 1
                if count > hashLimit {
                    log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
                    propAnnounceDOSMeter.Mark(1)
                    break
                }
                // If we have a valid block number, check that it's potentially useful
                if notification.number > 0 {
                    if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
                        log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
                        propAnnounceDropMeter.Mark(1)
                        break
                    }
                }
                // All is well, schedule the announce if block's not yet downloading
                if _, ok := f.fetching[notification.hash]; ok {
                    break
                }
                if _, ok := f.completing[notification.hash]; ok {
                    break
                }
                f.announces[notification.origin] = count
                f.announced[notification.hash] = append(f.announced[notification.hash], notification)
                if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
                    f.announceChangeHook(notification.hash, true)
                }
                if len(f.announced) == 1 {
                    f.rescheduleFetch(fetchTimer)
                }
    

    1,将收到的不满足条件的通知都丢弃掉,如果在f.fetching 状态列表里和f.completing 状态列表里,也直接返回。接着更新notification.origin 这个节点的announces 数量,添加到f.announced 等待fetch的表里。
    2,如果len(f.announced[notification.hash]) == 1 说明f.announced只有这一个通知,则调用f.announceChangeHook。
    3,如果len(f.announced) == 1 也说明只有一个通知,则启动fetchTimer的调度。

    case <-fetchTimer.C:
                // At least one block's timer ran out, check for needing retrieval
                request := make(map[string][]common.Hash)
    
                for hash, announces := range f.announced {
                    if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
                        // Pick a random peer to retrieve from, reset all others
                        announce := announces[rand.Intn(len(announces))]
                        f.forgetHash(hash)
    
                        // If the block still didn't arrive, queue for fetching
                        if f.getBlock(hash) == nil {
                            request[announce.origin] = append(request[announce.origin], hash)
                            f.fetching[hash] = announce
                        }
                    }
                }
                // Send out all block header requests
                for peer, hashes := range request {
                    log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
    
                    // Create a closure of the fetch and schedule in on a new thread
                    fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
                    go func() {
                        if f.fetchingHook != nil {
                            f.fetchingHook(hashes)
                        }
                        for _, hash := range hashes {
                            headerFetchMeter.Mark(1)
                            fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
                        }
                    }()
                }
                // Schedule the next fetch if blocks are still pending
                f.rescheduleFetch(fetchTimer)
    

    1,首先遍历f.announced,如果超过了arriveTimeout-gatherSlack这个时间,把这个hash对应在fetcher里面的状态都清了。
    这里随机拿这个announces里面任意一个announce,为啥随机取一个呢?因为都是同一个block的hash,这个hash下的哪一个announce都是一样的。
    如果发现超时了还没有没有获取到这个hash的block,则把这个announce加到request列表中,同时重新把announce放到f.fetching状态列表。
    2,然后遍历request列表,request列表里面的每个网络节点过来的所有的block的hash,都会调用fetchHeader(hash)方法来获取header数据。
    这个fetchHeader(hash)方法是pm.fetcher.Notify传进来的,peer.go
    里面的一个全局方法。
    3, 这时候NewBlockHashesMsg 的fetcher处理就结束了,最后再启动fetchTimer的调度。

    三,Fetcher分析, 之FilterHeaders()
    fetchHeader(hash)方法,调用了peer.go 里面的全局方法RequestOneHeader(hash common.Hash) Send给网络节点一个GetBlockHeadersMsg 消息。
    然后pm.handleMsg 收到 BlockHashesMsg广播通知

    case msg.Code == BlockHeadersMsg:
            // A batch of headers arrived to one of our previous requests
            var headers []*types.Header
            if err := msg.Decode(&headers); err != nil {
                return errResp(ErrDecode, "msg %v: %v", msg, err)
            }
            // If no headers were received, but we're expending a DAO fork check, maybe it's that
            if len(headers) == 0 && p.forkDrop != nil {
                // Possibly an empty reply to the fork header checks, sanity check TDs
                verifyDAO := true
    
                // If we already have a DAO header, we can check the peer's TD against it. If
                // the peer's ahead of this, it too must have a reply to the DAO check
                if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
                    if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
                        verifyDAO = false
                    }
                }
                // If we're seemingly on the same chain, disable the drop timer
                if verifyDAO {
                    p.Log().Debug("Seems to be on the same side of the DAO fork")
                    p.forkDrop.Stop()
                    p.forkDrop = nil
                    return nil
                }
            }
            // Filter out any explicitly requested headers, deliver the rest to the downloader
            filter := len(headers) == 1
            if filter {
                // If it's a potential DAO fork check, validate against the rules
                if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
                    // Disable the fork drop timer
                    p.forkDrop.Stop()
                    p.forkDrop = nil
    
                    // Validate the header and either drop the peer or continue
                    if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
                        p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
                        return err
                    }
                    p.Log().Debug("Verified to be on the same side of the DAO fork")
                    return nil
                }
                // Irrelevant of the fork checks, send the header to the fetcher just in case
                headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
            }
            if len(headers) > 0 || !filter {
                err := pm.downloader.DeliverHeaders(p.id, headers)
                if err != nil {
                    log.Debug("Failed to deliver headers", "err", err)
                }
            }
    

    如果不是硬分叉的daoHeader,同时len(headers) == 1,则执行pm.fetcher.FilterHeaders(p.id, headers, time.Now())方法

    func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
        log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
    
        // Send the filter channel to the fetcher
        filter := make(chan *headerFilterTask)
    
        select {
        case f.headerFilter <- filter:
        case <-f.quit:
            return nil
        }
        // Request the filtering of the header list
        select {
        case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
        case <-f.quit:
            return nil
        }
        // Retrieve the headers remaining after filtering
        select {
        case task := <-filter:
            return task.headers
        case <-f.quit:
            return nil
        }
    }
    

    send 一个filter 到f.headerFilter,fetcher的loop()主回路里f.headerFilter receive 到这个filter,进行处理。

    case filter := <-f.headerFilter:
                // Headers arrived from a remote peer. Extract those that were explicitly
                // requested by the fetcher, and return everything else so it's delivered
                // to other parts of the system.
                var task *headerFilterTask
                select {
                case task = <-filter:
                case <-f.quit:
                    return
                }
                headerFilterInMeter.Mark(int64(len(task.headers)))
    
                // Split the batch of headers into unknown ones (to return to the caller),
                // known incomplete ones (requiring body retrievals) and completed blocks.
                unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
                for _, header := range task.headers {
                    hash := header.Hash()
    
                    // Filter fetcher-requested headers from other synchronisation algorithms
                    if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
                        // If the delivered header does not match the promised number, drop the announcer
                        if header.Number.Uint64() != announce.number {
                            log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
                            f.dropPeer(announce.origin)
                            f.forgetHash(hash)
                            continue
                        }
                        // Only keep if not imported by other means
                        if f.getBlock(hash) == nil {
                            announce.header = header
                            announce.time = task.time
    
                            // If the block is empty (header only), short circuit into the final import queue
                            if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
                                log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
    
                                block := types.NewBlockWithHeader(header)
                                block.ReceivedAt = task.time
    
                                complete = append(complete, block)
                                f.completing[hash] = announce
                                continue
                            }
                            // Otherwise add to the list of blocks needing completion
                            incomplete = append(incomplete, announce)
                        } else {
                            log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
                            f.forgetHash(hash)
                        }
                    } else {
                        // Fetcher doesn't know about it, add to the return list
                        unknown = append(unknown, header)
                    }
                }
                headerFilterOutMeter.Mark(int64(len(unknown)))
                select {
                case filter <- &headerFilterTask{headers: unknown, time: task.time}:
                case <-f.quit:
                    return
                }
                // Schedule the retrieved headers for body completion
                for _, announce := range incomplete {
                    hash := announce.header.Hash()
                    if _, ok := f.completing[hash]; ok {
                        continue
                    }
                    f.fetched[hash] = append(f.fetched[hash], announce)
                    if len(f.fetched) == 1 {
                        f.rescheduleComplete(completeTimer)
                    }
                }
                // Schedule the header-only blocks for import
                for _, block := range complete {
                    if announce := f.completing[block.Hash()]; announce != nil {
                        f.enqueue(announce.origin, block)
                    }
                }
    

    1,遍历headerFilter里面的各个header,如果在 f.fetching状态列表,且不在f.fetched状态列表和 f.completing状态列表,就继续进行过滤,否则塞进unknown队列 发送给filter,FilterHeaders里面task 接收到filter,并作为FilterHeaders的返回值返回。
    2,如果发现这个header的number和从f.fetching状态列表取到的announce的number不一样,说明有可能收到一个伪造的区块通知,此时就要把这个可能的伪造节点和可能的伪造的hash抛弃,另可错杀,不能放过。
    3,如果本节点已经有这个hash的block,则放弃这个hash。如果这个block里面没有任何交易也没有任何叔区块,则把这个hash放入complete列表同时加入f.completing状态列表,否则放入incomplete列表。
    4,在incomplete列表里面,且不在f.completing状态列表里,则加入f.fetched状态列表,启动completeTimer的调度。
    5,在complete列表里面,同时也在f.completing状态列表,则调用f.enqueue(announce.origin, block)方法。

    case <-completeTimer.C:
                // At least one header's timer ran out, retrieve everything
                request := make(map[string][]common.Hash)
    
                for hash, announces := range f.fetched {
                    // Pick a random peer to retrieve from, reset all others
                    announce := announces[rand.Intn(len(announces))]
                    f.forgetHash(hash)
    
                    // If the block still didn't arrive, queue for completion
                    if f.getBlock(hash) == nil {
                        request[announce.origin] = append(request[announce.origin], hash)
                        f.completing[hash] = announce
                    }
                }
                // Send out all block body requests
                for peer, hashes := range request {
                    log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
    
                    // Create a closure of the fetch and schedule in on a new thread
                    if f.completingHook != nil {
                        f.completingHook(hashes)
                    }
                    bodyFetchMeter.Mark(int64(len(hashes)))
                    go f.completing[hashes[0]].fetchBodies(hashes)
                }
                // Schedule the next fetch if blocks are still pending
                f.rescheduleComplete(completeTimer)
    

    1,首先遍历f.fetched,hash对应在fetcher里面的状态都清了。
    如果发现超时了还没有没有获取到这个hash的block,则把这个announce加到request列表中,同时重新把announce放到f.completing状态列表。
    2,然后遍历request列表,request列表里面的每个网络节点过来的所有的block的hash,都会调用fetchBodies(hashes)方法来获取区块body数据。这个fetchBodies(hashes)方法是peer.go里面的一个全局方法。
    3, 这时候BlockHashesMsg 的fetcher处理就结束了,最后再启动completeTimer循环调度。

    四,Fetcher分析, 之FilterBodies() ,Enqueue(),
    1,fetchBodies(hash)方法,调用了peer.go 里面的全局方法RequestBodies(hashes []common.Hash) Send给网络节点一个GetBlockBodiesMsg 消息。
    2,然后pm.handleMsg 会收到 BlockBodiesMsg广播通知。
    3,执行 pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())。
    接下来就和FilterHeaders()流程类似,一顿啪啪啪验证,一顿啪啪啪改变状态,一顿啪啪啪通道跳转
    4,庆幸的是,走完FilterBodies()就完事了,不用在走timer调度,也不用再发网络请求了。
    5,在FilterHeaders()和FilterBodies()最后都走到了f.enqueue(announce.origin, block)方法

    func (f *Fetcher) enqueue(peer string, block *types.Block) {
        hash := block.Hash()
    
        // Ensure the peer isn't DOSing us
        count := f.queues[peer] + 1
        if count > blockLimit {
            log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
            propBroadcastDOSMeter.Mark(1)
            f.forgetHash(hash)
            return
        }
        // Discard any past or too distant blocks
        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
            log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
            propBroadcastDropMeter.Mark(1)
            f.forgetHash(hash)
            return
        }
        // Schedule the block for future importing
        if _, ok := f.queued[hash]; !ok {
            op := &inject{
                origin: peer,
                block:  block,
            }
            f.queues[peer] = count
            f.queued[hash] = op
            f.queue.Push(op, -float32(block.NumberU64()))
            if f.queueChangeHook != nil {
                f.queueChangeHook(op.block.Hash(), true)
            }
            log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
        }
    }
    

    过滤掉太远的区块。并把hash加入到f.queue列表中。
    在loop主回路里面遍历f.queue列表,并把列表中的block insert到本地的block chain中。

    func (f *Fetcher) insert(peer string, block *types.Block) {
        hash := block.Hash()
    
        // Run the import on a new thread
        log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
        go func() {
            defer func() { f.done <- hash }()
    
            // If the parent's unknown, abort insertion
            parent := f.getBlock(block.ParentHash())
            if parent == nil {
                log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
                return
            }
            // Quickly validate the header and propagate the block if it passes
            switch err := f.verifyHeader(block.Header()); err {
            case nil:
                // All ok, quickly propagate to our peers
                propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
                go f.broadcastBlock(block, true)
    
            case consensus.ErrFutureBlock:
                // Weird future block, don't fail, but neither propagate
    
            default:
                // Something went very wrong, drop the peer
                log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
                f.dropPeer(peer)
                return
            }
            // Run the actual import and log any issues
            if _, err := f.insertChain(types.Blocks{block}); err != nil {
                log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
                return
            }
            // If import succeeded, broadcast the block
            propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
            go f.broadcastBlock(block, false)
    
            // Invoke the testing hook if needed
            if f.importedHook != nil {
                f.importedHook(block)
            }
        }()
    }
    

    首先调用共识引擎的方法f.verifyHeader(block.Header()),验证blockHeader的有效性。
    如果没问题就广播出去,告诉全世界我的区块链更新了一个新区块。
    然后调用f.insertChain(types.Blocks{block}) 插入本地区块链。
    插入成功,最后再广播一次(这是多么的自恋啊),这次只广播block的hash。

    总结
    fetcher.go 作为以太坊同步区块的一个辅助类,它的职责就是层层把关,层层过滤,抵制无效的区块进入,杜绝无用的同步请求。这块代码很多很乱,第一次看可能会有点晕,第二次看可能还是很晕,多看几次可能还会晕😄,不过只要知道它做什么就好了。

    相关文章

      网友评论

          本文标题:以太坊源码深入分析(6)-- 以太坊P2P协议接收广播的处理和F

          本文链接:https://www.haomeiwen.com/subject/orzylftx.html