美文网首页区块链技术探索
以太坊块同步源码分析

以太坊块同步源码分析

作者: hasika | 来源:发表于2018-10-13 10:45 被阅读16次

    主要代码在ProtocolManager,同步方法以pm的Start方法开始

    • Start
      • pm.minedBroadcastLoop 订阅区块广播
        • pm.BroadcastBlock(ev.Block, true) 将区块信息同步给部分连接的节点
          • peer.AsyncSendNewBlock(block, td)
            • p.queuedProps <- &propEvent{block: block, td: td}: 向管道中插入数据
            • prop := <-p.queuedProps: 管道的另一头在peer.broadcast方法中
            • p.SendNewBlock(prop.block, prop.td),从上面的管道中拿到数据后,会执行这个方法
              • p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) 调用p2p的方法发送数据,传递参数是NewBlockMsg
          • handleMsg p2p的另一端,pm的handleMsg接受消息,消息类型是NewBlockMsg
            • pm.fetcher.Enqueue(p.id, request.Block) 下载区块
              • f.inject <- op: 向管道中插入数据
            • loop方法中的f.inject <- op:管道接受数据 在Fetcher类中
              • f.enqueue(op.origin, op.block) 调用插入队列方法
              • f.queue.Push(op, -float32(block.NumberU64())) 将区块插入队列中,这里其实区块并没有插入到区块链中
            • 在loop方法循环代码块中,还有一段代码,是将f.queue队列中的区块插入到区块链中。
            • f.insert(op.origin, op.block) 将区块插入到区块链中
              • f.done <- hash 方法执行完后,会向done管道中插入数据, 通知区块插入成功
              • f.verifyHeader(block.Header() 验证区块头信息
                • engine.VerifyHeader(blockchain, header, true),调用共识模块的验证方法
              • go f.broadcastBlock(block, true) 区块同步给链接的节点
              • go f.broadcastBlock(block, false) 区块部分信息同步给链接的节点
            • hash := <-f.done f.loop方法中的done管道取到数据,说明该区块已经插入数据库,删除队列缓存
              • f.forgetHash(hash)
              • f.forgetBlock(hash)
        • pm.BroadcastBlock(ev.Block, false) 将区块部分信息同步给所有连接的节点
          • peer.AsyncSendNewBlockHash(block)
            • p.queuedAnns <- block
            • block := <-p.queuedAnns peer.broadcast方法中的管道接收到数据
            • p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()})只是将区块的hash和高度同步给连接的节点
            • p.SendNewBlockHashes 通过p2p网络发送区块部分信息,消息为:NewBlockHashesMsg
          • handleMsg p2p的另一端,pm的handleMsg接受消息,消息类型是NewBlockHashesMsg
          • pm.fetcher.Notify 通知fetcher有新的区块需要同步
          • f.notify <- block 向管道中插入数据
          • notification := <-f.notify ,f.loop方法中的对应管道收到信息
            • f.fetching[notification.hash] 说明该区块正在下载区块头,退出
            • f.completing[notification.hash] 说明该区块正在下载区块体,退出
            • f.announced[notification.hash] = append(f.announced[notification.hash], notification) 将区块插入到announced队列中,准备下载区块头
            • 将其他p2p节点同步过来的数据,插入队列后,什么时候开始同步呢?其实这里定义了定时器
          • <-fetchTimer.C f.loop方法中的定时器触发,用来同步区块头
            • fetchHeader(hash) 下载区块头
            • p2p.Send(p.rw, GetBlockHeadersMsg..)调用p2p方法下载区块头,消息类型:GetBlockHeadersMsg
            • p.SendBlockHeaders(headers),调用p2p方法传递区块头信息,这里会传递最新的几个区块头,p2p节点请求一个区块头,实际上这里会返回多个区块头回去。
            • p2p.Send(p.rw, BlockHeadersMsg, headers)
          • BlockHeadersMsg p2p的另一端
            • headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) 当返回的区块头只有一个时
              • f.headerFilter <- filter 向管道插入数据
                • f.fetched[hash] = append(f.fetched[hash], announce) 放入已经同步区块头的列表中,准备同步区块体
                • f.enqueue(announce.origin, block) 如果该区块只有区块头,就不用在同步区块体了,直接插入区块链中
          • completeTimer.C f.loop方法中的定时器,用来同步区块体
            • go f.completing[hashes[0]].fetchBodies(hashes) 调用peer的fetchBodies下载区块
            • p2p.Send(p.rw, GetBlockBodiesMsg, hashes)调用p2p方法真正下载区块,消息类型:GetBlockBodiesMsg
            • p.SendBlockBodiesRLP(bodies) 通过p2p网络将区块数据发送到请求节点
            • p2p.Send(p.rw, BlockBodiesMsg, bodies) 发送的消息类型:BlockBodiesMsg
            • BlockBodiesMsg handleMsg接收到区块数据后
            • pm.fetcher.FilterBodies 调用fetcher的方法校验区块数据
            • f.bodyFilter <- filter: 将数据插入到管道中
              • block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) 根据p2p传过来的数据,生成区块
              • f.enqueue(announce.origin, block) 将数据信息插入到区块链中
    func (pm *ProtocolManager) Start(maxPeers int) {
       pm.maxPeers = maxPeers
    
       // broadcast transactions
       pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
       pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
       //交易广播
       go pm.txBroadcastLoop()
    
       // broadcast mined blocks
       //挖矿广播
       pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
       go pm.minedBroadcastLoop()
    
       // start sync handlers
       go pm.syncer()
       go pm.txsyncLoop()
    }
    
    // Mined broadcast loop
    func (pm *ProtocolManager) minedBroadcastLoop() {
       // automatically stops if unsubscribe
       for obj := range pm.minedBlockSub.Chan() {
          if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
             pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
             pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
          }
       }
    }
    
    func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
       hash := block.Hash()
       peers := pm.peers.PeersWithoutBlock(hash)
    
       // If propagation is requested, send to a subset of the peer
       //参数propagate为true时
       if propagate {
             td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
    
          // Send the block to a subset of our peers
          //将区块同步给一部分连接节点
          transfer := peers[:int(math.Sqrt(float64(len(peers))))]
          for _, peer := range transfer {
             peer.AsyncSendNewBlock(block, td)
          }
         
          return
       }
       // Otherwise if the block is indeed in out own chain, announce it
       // 参数propagate为false时
       if pm.blockchain.HasBlock(hash, block.NumberU64()) {
          //将区块一部分信息同步给所有的连接节点
          for _, peer := range peers {
             peer.AsyncSendNewBlockHash(block)
          }
       }
    }
    
    // AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
    // the peer's broadcast queue is full, the event is silently dropped.
    func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
       select {
       //向管道中插入数据
       case p.queuedProps <- &propEvent{block: block, td: td}:
          p.knownBlocks.Add(block.Hash())
       default:
          p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
       }
    }
    
    // broadcast is a write loop that multiplexes block propagations, announcements
    // and transaction broadcasts into the remote peer. The goal is to have an async
    // writer that does not lock up node internals.
    // 发现节点后,会注册节点,注册节点的时候,会调用这个方法,这个方法有一些管道
    func (p *peer) broadcast() {
       for {
          select {
           //处理广播交易的管道
          case txs := <-p.queuedTxs:
             if err := p.SendTransactions(txs); err != nil {
                return
             }
          //处理广播区块的管道
          case prop := <-p.queuedProps:
             if err := p.SendNewBlock(prop.block, prop.td); err != nil {
                return
             }
    
          case block := <-p.queuedAnns:
             if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
                return
             }
       }
    }
    
    func (pm *ProtocolManager) handleMsg(p *peer) error {
        case msg.Code == NewBlockMsg:
            ...
            // 下载区块
            pm.fetcher.Enqueue(p.id, request.Block)
            //如果同步过来的难度值大于本地区块链中的难度值,同步区块链
                if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
                    go pm.synchronise(p)
                }
            }
        //同步区块(hash和高度)
        case msg.Code == NewBlockHashesMsg:
            var announces newBlockHashesData
            if err := msg.Decode(&announces); err != nil {
                return errResp(ErrDecode, "%v: %v", msg, err)
            }
    
            // Schedule all the unknown hashes for retrieval
            unknown := make(newBlockHashesData, 0, len(announces))
            for _, block := range announces {
                if !pm.blockchain.HasBlock(block.Hash, block.Number) {
                    unknown = append(unknown, block)
                }
            }
            for _, block := range unknown {
                //通知fetcher有新的区块需要同步
                pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
            }
    
        // Block header query, collect the requested headers and reply
        // 同步区块头,并且同步多个
        case msg.Code == GetBlockHeadersMsg:
    
            // Gather headers until the fetch or network limits is reached
            var (
                bytes   common.StorageSize
                headers []*types.Header
                unknown bool
            )
            //以p2p网络传过来的头信息,查找区块链上的最新的区块信息,一次同步过去
            // p2p传递过来的高度是10000,那么如果当前节点有10001,10002,这些节点也会传送过去
            for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
                // Retrieve the next header satisfying the query
                var origin *types.Header
    
                origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
    
                headers = append(headers, origin)
                bytes += estHeaderRlpSize
    
                // Advance to the next header of the query
                switch {
            
                case hashMode && !query.Reverse:
                    // Hash based traversal towards the leaf block
                    var (
                        current = origin.Number.Uint64()
                        next    = current + query.Skip + 1
                    )
                    if next <= current {
                        infos, _ := json.MarshalIndent(p.Peer.Info(), "", "  ")
                        p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
                        unknown = true
                    } else {
                        if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
                            nextHash := header.Hash()
                            expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
                            if expOldHash == query.Origin.Hash {
                                query.Origin.Hash, query.Origin.Number = nextHash, next
                            } else {
                                unknown = true
                            }
                        } else {
                            unknown = true
                        }
                    }
            }
            return p.SendBlockHeaders(headers)
                
          //拿到从其他节点返回的区块头信息
          case msg.Code == BlockHeadersMsg:
            // A batch of headers arrived to one of our previous requests
            var headers []*types.Header
            if err := msg.Decode(&headers); err != nil {
                return errResp(ErrDecode, "msg %v: %v", msg, err)
            }
            
            // Filter out any explicitly requested headers, deliver the rest to the downloader
            filter := len(headers) == 1
            if filter {
                //如果只有一个区块头信息返回
                headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
            }
            if len(headers) > 0 || !filter {
                //如果有多个区块头信息返回
                err := pm.downloader.DeliverHeaders(p.id, headers)
            }
                
          //获取区块体请求      
          case msg.Code == GetBlockBodiesMsg:
            // Decode the retrieval message
            msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
            if _, err := msgStream.List(); err != nil {
                return err
            }
            // Gather blocks until the fetch or network limits is reached
            var (
                hash   common.Hash
                bytes  int
                bodies []rlp.RawValue
            )
            for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
                // Retrieve the hash of the next block
                if err := msgStream.Decode(&hash); err == rlp.EOL {
                    break
                } else if err != nil {
                    return errResp(ErrDecode, "msg %v: %v", msg, err)
                }
                // Retrieve the requested block body, stopping if enough was found
                if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
                    //将bodies信息放在数组中
                    bodies = append(bodies, data)
                    bytes += len(data)
                }
            }
            // 通过p2p网络发送区块数据
            return p.SendBlockBodiesRLP(bodies)
                
         //节点接收到区块数据   
         case msg.Code == BlockBodiesMsg:
            //调用方法校验区块体,
            transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, 
            
    }
    
    // Enqueue tries to fill gaps the fetcher's future import queue.
    //p2p网络接收到NewBlockMsg消息时,调用这个方法
    func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
       op := &inject{
          origin: peer,
          block:  block,
       }
       select {
           //向管道中插入数据
       case f.inject <- op:
          return nil
       case <-f.quit:
          return errTerminated
       }
    }
    
    func (f *Fetcher) loop() {
        for{
            //区块链目前的高度
            height := f.chainHeight()
            for !f.queue.Empty() {
                op := f.queue.PopItem().(*inject)
                // If too high up the chain or phase, continue later
                number := op.block.NumberU64()
                // 如果排队的区块高度,大于区块链高度+1,说明暂时还不需要插入到区块中,等待合适的区块插入后,再把这个区块插入
                if number > height+1 {
                    f.queue.Push(op, -float32(number))
                    if f.queueChangeHook != nil {
                        f.queueChangeHook(hash, true)
                    }
                    break
                }
                f.insert(op.origin, op.block)
            
                // Wait for an outside event to occur
                select {
                    //从管道中取到p2p发送过来的区块
                    case op := <-f.inject:
                    // A direct block insertion was requested, try and fill any pending gaps
                    propBroadcastInMeter.Mark(1)
                    f.enqueue(op.origin, op.block)
                    
                    case hash := <-f.done:
                    // A pending import finished, remove all traces of the notification
                    f.forgetHash(hash)
                    f.forgetBlock(hash)
                    
                    // 得到新块消息时
            case notification := <-f.notify:
                // All is well, schedule the announce if block's not yet downloading
                //正在下载头
                if _, ok := f.fetching[notification.hash]; ok {
                    break
                }
    
                //区块头已经下载,正在下载区块体
                if _, ok := f.completing[notification.hash]; ok {
                    break
                }
    
                //每个节点请求的数量
                f.announces[notification.origin] = count
    
                //准备下载头
                f.announced[notification.hash] = append(f.announced[notification.hash], notification)
                    
            case <-fetchTimer.C:
                // At least one block's timer ran out, check for needing retrieval
                request := make(map[string][]common.Hash)
    
                for hash, announces := range f.announced {
                        if f.getBlock(hash) == nil {
                            //将没有本地没有的区块,插入到request队列中
                            request[announce.origin] = append(request[announce.origin], hash)                   
                            //在正在下载的队列中保存该announce
                            f.fetching[hash] = announce
                        }
                
                }
                // Send out all block header requests
                for peer, hashes := range request {
    
                    // Create a closure of the fetch and schedule in on a new thread
                    fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
                    go func() {
                        for _, hash := range hashes {
                            //下载区块头
                            fetchHeader(hash) // Suboptimal, but protocol doesn't allow 
                        }
            }
        }
                    
         // 从p2p节点拿到区块头信息            
         case filter := <-f.headerFilter:
    
                unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
                for _, header := range task.headers {
                    hash := header.Hash()
    
                            // 该头信息对应的区块是空块
                            if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
    
                                block := types.NewBlockWithHeader(header)
                                block.ReceivedAt = task.time
                                //标记已经完成同步
                                complete = append(complete, block)
                                f.completing[hash] = announce
                                continue
                            }
                            // Otherwise add to the list of blocks needing completion
                            // 还没有完成同步,需要同步区块体
                            incomplete = append(incomplete, announce)
                        } 
                }
                // Schedule the retrieved headers for body completion
                for _, announce := range incomplete {
                    hash := announce.header.Hash()
                    // 已经在同步区块体了,返回
                    if _, ok := f.completing[hash]; ok {
                        continue
                    }
                    //已经同步了区块头,准备同步区块体
                    f.fetched[hash] = append(f.fetched[hash], announce)
                    if len(f.fetched) == 1 {
                        f.rescheduleComplete(completeTimer)
                    }
                }
                // Schedule the header-only blocks for import
                // 将只有区块头的区块,插入到区块链中
                for _, block := range complete {
                    if announce := f.completing[block.Hash()]; announce != nil {
                        f.enqueue(announce.origin, block)
                    }
                }
                //同步区块体的定时器
                case <-completeTimer.C:
                // At least one header's timer ran out, retrieve everything
                request := make(map[string][]common.Hash)
    
                for hash, announces := range f.fetched {
                    // Pick a random peer to retrieve from, reset all others
                    announce := announces[rand.Intn(len(announces))]
                    f.forgetHash(hash)
    
                    // If the block still didn't arrive, queue for completion
                    if f.getBlock(hash) == nil {
                        //将需要下载的区块信息存入数组中
                        request[announce.origin] = append(request[announce.origin], hash)
                        f.completing[hash] = announce
                    }
                }
                // Send out all block body requests
                for peer, hashes := range request {
                    log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
    
                    //调用peer的下载方法,下载区块
                    go f.completing[hashes[0]].fetchBodies(hashes)
                }
                
         // filter从管道中拿到数据
         case filter := <-f.bodyFilter:
                var task *bodyFilterTask
                
                blocks := []*types.Block{}
                for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
                    // Match up a body to any possible completion request
                    matched := false
                // 根据p2p网络传过来的数据,生成区块
                block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
    
                }
                // 将区块插入区块链中
                for _, block := range blocks {
                    if announce := f.completing[block.Hash()]; announce != nil {
                        f.enqueue(announce.origin, block)
                    }
                }
            }
    
    }
    
    func (f *Fetcher) enqueue(peer string, block *types.Block) {
       hash := block.Hash()
    
       // Ensure the peer isn't DOSing us
       count := f.queues[peer] + 1
    
       if _, ok := f.queued[hash]; !ok {
          op := &inject{
             origin: peer,
             block:  block,
          }
          f.queues[peer] = count
          f.queued[hash] = op
           //将区块加入到队列中
          f.queue.Push(op, -float32(block.NumberU64()))
       }
    }
    
    //将f.queue队列中的区块插入到区块链中
    func (f *Fetcher) insert(peer string, block *types.Block) {
       hash := block.Hash()
    
       go func() {
           //方法执行完后,会向f.done管道插入数据
          defer func() { f.done <- hash }()
    
          // Quickly validate the header and propagate the block if it passes
           //校验区块头是否合法
          switch err := f.verifyHeader(block.Header()); err {
          //如果没有错误,说明是区块头是合法的
          case nil: 
             // 向周围节点广播区块
             go f.broadcastBlock(block, true)
          }
          // Run the actual import and log any issues
          // 真正将区块插入到区块链中
          if _, err := f.insertChain(types.Blocks{block}); err != nil {
             return
          }
            // 广播区块部分信心到其他节点
          go f.broadcastBlock(block, false)
       }()
    }
    
    //同步区块部分信息
    func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
       select {
       //向管道中插入数据
       case p.queuedAnns <- block:
          p.knownBlocks.Add(block.Hash())
       default:
          p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
       }
    }
    
    // SendNewBlockHashes announces the availability of a number of blocks through
    // a hash notification.
    func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
       for _, hash := range hashes {
          p.knownBlocks.Add(hash)
       }
       request := make(newBlockHashesData, len(hashes))
       for i := 0; i < len(hashes); i++ {
          request[i].Hash = hashes[i]
          request[i].Number = numbers[i]
       }
        //通过p2p网络发送区块部分信息,消息为:NewBlockHashesMsg
       return p2p.Send(p.rw, NewBlockHashesMsg, request)
    }
    
    // Notify announces the fetcher of the potential availability of a new block in
    // the network.
    func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
       headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
       block := &announce{
          hash:        hash,
          number:      number,
          time:        time,
          origin:      peer,
          fetchHeader: headerFetcher,
          fetchBodies: bodyFetcher,
       }
       select {
           //向管道中插入数据
       case f.notify <- block:
          return nil
       case <-f.quit:
          return errTerminated
       }
    }
    
    func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
       log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
    
       // Send the filter channel to the fetcher
       filter := make(chan *headerFilterTask)
    
       select {
           //向f.headerFilter管道插入数据
       case f.headerFilter <- filter:
       case <-f.quit:
          return nil
       }
       // Request the filtering of the header list
       select {
       case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
    
    }
    
    func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
       select {
           //向管道中写入数据
       case destCh <- packet:
          return nil
       case <-cancel:
          return errNoSyncActive
       }
    }
    
    func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
    
    
       // Send the filter channel to the fetcher
       filter := make(chan *bodyFilterTask)
    
       select {
        //将数据插入到f.bodyFilter管道中
       case f.bodyFilter <- filter:
       case <-f.quit:
          return nil, nil
       }
       // Request the filtering of the body list
       select {
       case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
       case <-f.quit:
          return nil, nil
      
       }
    }
    

    https://t.zsxq.com/iiMvfea

    我的星球.jpg

    相关文章

      网友评论

        本文标题:以太坊块同步源码分析

        本文链接:https://www.haomeiwen.com/subject/sdqoaftx.html