美文网首页SmartMesh Developer Community
Spectrum 区块偶尔停止同步问题排查与解决笔记

Spectrum 区块偶尔停止同步问题排查与解决笔记

作者: cc14514 | 来源:发表于2018-11-28 16:57 被阅读4次

    同步失败问的题追踪

    代码地址: https://github.com/SmartMeshFoundation/Spectrum
    本次修正将提交在 dev 分支中,并会随 0.5.2 版本一起发布

    • 问题描述:

      当节点为创世节点时,没有进入轮换阵营时节点会处于等待提名状态,永远也不会被成功提名,因为创世节点在共识合约中有拒绝提名的判断。
      此时同步到一定数量的新块之后,块会停留在那个高度不再增长,本次观测停留在 1222972 块不再增长了。

    • 线索日志:

    DEBUG[11-21|19:21:46|core/blockchain.go:1009]            Inserted new block                       number=1222972 hash=7af7f5�~@�985c9e                                                      uncles=0   txs=1   gas=0     elapsed=11.820ms
    DEBUG[11-21|19:21:52|eth/downloader/downloader.go:1562]  Recalculated downloader QoS values       rtt=11.836220508s confidence=1.000 ttl=35.508697032s
    DEBUG[11-21|19:22:00|eth/handler.go:642]                 <<NewBlockMsg>>                          trueTD=3077093 p.td=3077078 p.id=f6cb0c9800fb01fb11cb313848091596ba4fd167e1c7873e0520ebdd59ceb454cf5a16d7f78ff7aaa91f117ad6694bca4de63d3150cb1b48813d75d4b98e2deb
    DEBUG[11-21|19:22:00|eth/fetcher/fetcher.go:607]         Discarded propagated block, exceeded allowance peer=f6cb0c9800fb01fb      number=1222973 hash=33b369�~@�26e683                                                      limit=64
    DEBUG[11-21|19:22:00|eth/peer.go:193]                    Fetching single header                   id=4b36359d6b54ab46 conn=dyndial hash=33b369�~@�26e683
    DEBUG[11-21|19:22:00|eth/peer.go:214]                    Fetching batch of block bodies           id=4b36359d6b54ab46 conn=dyndial count=1
    DEBUG[11-21|19:22:00|eth/fetcher/fetcher.go:607]         Discarded propagated block, exceeded allowance peer=4b36359d6b54ab46      number=1222973 hash=33b369�~@�26e683                                                      limit=64
    DEBUG[11-21|19:22:03|eth/downloader/downloader.go:1562]  Recalculated downloader QoS values       rtt=11.836220508s confidence=1.000 ttl=35.508697032s
    DEBUG[11-21|19:22:14|eth/handler.go:642]                 <<NewBlockMsg>>                          trueTD=3077096 p.td=3077081 p.id=6549749a9e83b4bd89e1469d51986cc1689094b6621daa651d3e76dc9720659008cad99e949d274b6c26e87241964775e22a01e167b79b85dd454fd160b46fac
    DEBUG[11-21|19:22:14|eth/fetcher/fetcher.go:631]         Queued propagated block                  peer=6549749a9e83b4bd      number=1222974 hash=670fb2�~@�f0af05                                                      queued=1
    

    同步块的逻辑:

    从日志上看一直有 "NewBlockMsg" 日志输出,说明问题不在网络层。
    这部分与块的下载也无关,所有逻辑貌似都正常

    • 入口方法:
      func (pm *ProtocolManager) synchronise(peer *peer)

    被三个点触发
    1、newblock 消息
    2、新 peer 连接 :
    3、定时器

    eth/handler.go 的 func (pm *ProtocolManager) handleMsg(p *peer) error 处理全部消息,其中包括 NewBlockMsg

        case msg.Code == NewBlockMsg:
            // Retrieve and decode the propagated block
            var request newBlockData
            if err := msg.Decode(&request); err != nil {
                return errResp(ErrDecode, "%v: %v", msg, err)
            }
            request.Block.ReceivedAt = msg.ReceivedAt
            request.Block.ReceivedFrom = p
    
            // Mark the peer as owning the block and schedule it for import
            p.MarkBlock(request.Block.Hash())
            pm.fetcher.Enqueue(p.id, request.Block)
    
            // Assuming the block is importable by the peer, but possibly not yet done so,
            // calculate the head hash and TD that the peer truly must have.
            var (
                trueHead = request.Block.ParentHash()
                trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
            )
            _, tttt := p.Head()
    
            currentBlock := pm.blockchain.CurrentBlock()
            peerpub, _ := p.ID().Pubkey()
            peeraddr := crypto.PubkeyToAddress(*peerpub)
            log.Debug("<<NewBlockMsg>>",
                "currentBlock", currentBlock,
                "recvBlock", request.Block.Number(),
                "currentTD", pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()),
                "trueTD", trueTD,
                "p.td", tttt,
                "p.id", peeraddr.Hex(),
            )
            // Update the peers total difficulty if better than the previous
            if _, td := p.Head(); trueTD.Cmp(td) > 0 {
                p.SetHead(trueHead, trueTD)
    
                // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
                // a singe block (as the true TD is below the propagated block), however this
                // scenario should easily be covered by the fetcher.
                //currentBlock := pm.blockchain.CurrentBlock()
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
                    go pm.synchronise(p)
                }
            }
    

    pm.fetcher.Enqueue(p.id, request.Block) 到底在干什么

    // Enqueue tries to fill gaps the the fetcher's future import queue.
    func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
        op := &inject{
            origin: peer,
            block:  block,
        }
        select {
        case f.inject <- op:
            return nil
        case <-f.quit:
            return errTerminated
        }
    }
    

    他是把参数放进 fetcher.inject 中,然后被 loop() 处理,如下片段

            case op := <-f.inject:
                // A direct block insertion was requested, try and fill any pending gaps
                propBroadcastInMeter.Mark(1)
                f.enqueue(op.origin, op.block)
    

    f.enqueue 里面有很重要的规则

    func (f *Fetcher) enqueue(peer string, block *types.Block) {
        hash := block.Hash()
    
        // Ensure the peer isn't DOSing us
        count := f.queues[peer] + 1
        // 如果当前 peer 已经有超过 64 个块在排队等待处理,则忽略当前块  blockLimit = 64  
        if count > blockLimit {
            log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
            propBroadcastDOSMeter.Mark(1)
            f.forgetHash(hash)
            return
        }
        // Discard any past or too distant blocks
        // 如果收到的块是 7 块之前的叔块或者 32块以后的块,要忽略掉 maxUncleDist = 7 , maxQueueDist = 32
        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
            log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
            propBroadcastDropMeter.Mark(1)
            f.forgetHash(hash)
            return
        }
        // Schedule the block for future importing
        if _, ok := f.queued[hash]; !ok {
            op := &inject{
                origin: peer,
                block:  block,
            }
            f.queues[peer] = count
            f.queued[hash] = op
            f.queue.Push(op, -float32(block.NumberU64()))
            if f.queueChangeHook != nil {
                f.queueChangeHook(op.block.Hash(), true)
            }
            log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
        }
    }
    

    主要是放到 f.queue 中,然后在 loop() 中对 f.queue 进行循环处理,我感觉问题就出现在这个函数中 func (f *Fetcher) loop() ,开头部分的 queue 处理逻辑可能有问题

    问题出在 if count > blockLimit 这个条件成立时,这个条件为什么会成立?

    // enqueue schedules a new future import operation, if the block to be imported
    // has not yet been seen.
    func (f *Fetcher) enqueue(peer string, block *types.Block) {
        hash := block.Hash()
    
        // Ensure the peer isn't DOSing us
        count := f.queues[peer] + 1
        if count > blockLimit {
            log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
            propBroadcastDOSMeter.Mark(1)
            f.forgetHash(hash)
            return
        }
        // Discard any past or too distant blocks
        if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
            log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
            propBroadcastDropMeter.Mark(1)
            f.forgetHash(hash)
            return
        }
        ......
    

    输出了更多日志进行分析,发现是 insertChain 没有正确返回,里面有阻塞

    // insert spawns a new goroutine to run a block insertion into the chain. If the
    // block's number is at the same height as the current import phase, if updates
    // the phase states accordingly.
    func (f *Fetcher) insert(peer string, block *types.Block) {
    ......
            log.Debug("insert_begin","number",block.Number())
            // 这里没有返回   
            if _, err := f.insertChain(types.Blocks{block}); err != nil {
                log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
                return
            }
            log.Debug("insert_end","number",block.Number())
    ......
    }
    
    
    观察只有 begin 没有 end 和 done 时,去 grep Inserted 关键字
    DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:672]         insert_begin                             number=1253372
    DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:674]         insert_end                               number=1253372 err=nil
    DEBUG[11-26|17:35:06|eth/fetcher/fetcher.go:645]         insert_done                              number=1253372
    
    

    最终确认,是阻塞在事件广播上

    //core/blockchain.go
    func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
        n, events, logs, err := bc.insertChain(chain)
        // 这里阻塞了    
        bc.PostChainEvents(events, logs)
        log.Debug("Inserted block end", "number", chain[0].Number())
        return n, err
    }
    
    //
    // PostChainEvents iterates over the events generated by a chain insertion and
    // posts them into the event feed.
    // TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
    func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
        // post event logs for further processing
        if logs != nil {
            bc.logsFeed.Send(logs)
        }
        for _, event := range events {
            switch ev := event.(type) {
            case ChainEvent:
                bc.chainFeed.Send(ev)
    
            case ChainHeadEvent:
                //上面的阻塞是因为这里的阻塞造成的
                bc.chainHeadFeed.Send(ev)
    
            case ChainSideEvent:
                bc.chainSideFeed.Send(ev)
            }
        }
    }
    

    可以看到这个阻塞是发布 ChainHead 事件时造成的,首先要找到哪些地方在调用这个 subscribe,可能有锁的竞争,也许是一个没有完成的调用导致的。但是它是运行一段事件后才开始阻塞的,这就有些奇怪了。
    通过对 chainHeadFeed 的订阅者排查,最终定位问题,在 worker 的 update 方法中,接受 chainHeadEvent 时要执行 commitNewWork() ,第一次执行成功,第二次失败并阻塞,这个订阅接受消息的 channel 有 10 个 slot ,所以 10 个事件以后彻底阻塞事件广播模块,并在某个 peer 阻塞 send event 的 64 个块之后,将 peer 判定为 dosing 节点,然后就出现上文提到的现象,会有概率丢失一个块,而造成不同步。

    • 通过如下逻辑调整可以解决上述问题:
      在 worker.commitNewWork() 中,经过调试,发现是阻塞在 self.push(work) 上
    // push sends a new work task to currently live miner agents.
    func (self *worker) push(work *Work) {
        // 在 miner 没有正确启动前应该走这个分支,但是因为之前对 miner.start 做了异步处理
        // 误将这个标识错误的设置为启动成功,此时没有启动 agents,最终导致下面for逻辑的阻塞 
        if atomic.LoadInt32(&self.mining) != 1 {
            return
        }
        for agent := range self.agents {
            atomic.AddInt32(&self.atWork, 1)
            if ch := agent.Work(); ch != nil {
                ch <- work
            }
        }
    }
    
    // 解决办法也比较简单,只是排查起来比较复杂,将 self.mining 挪到下面去设置即可
    func (self *worker) start(s chan int) {
        self.mu.Lock()
        defer self.mu.Unlock()
        // 挪到下面去
        //atomic.StoreInt32(&self.mining, 1)
    
        //add by liangc : sync mining status
        wg := new(sync.WaitGroup)
        if tribe, ok := self.engine.(*tribe.Tribe); ok {
            wg.Add(1)
            go func() {
                defer wg.Done()
                if self.chain.CurrentHeader().Number.Int64() > 1 { // free for genesis signer
                    log.Info("⚠️ Everything is ready, Waiting for nomination, pending until miner level upgrade")
                    // pending until miner level upgrade
                    tribe.WaitingNomination()
                }
                tribe.SetMining(1, self.chain.CurrentBlock().Number(), self.chain.CurrentHeader().Hash())
            }()
        }
    
        go func() {
            defer func() { s <- 1 }()
            wg.Wait()
            // 从上面挪下来的 >>>>
            atomic.StoreInt32(&self.mining, 1)  
            // 从上面挪下来的 <<<<
            // spin up agents
            for agent := range self.agents {
                agent.Start()
            }
        }()
    }
    

    相关文章

      网友评论

        本文标题:Spectrum 区块偶尔停止同步问题排查与解决笔记

        本文链接:https://www.haomeiwen.com/subject/yuqcqqtx.html