美文网首页区块链教程
兄弟连区块链教程以太坊源码分析chain-indexer区块链索

兄弟连区块链教程以太坊源码分析chain-indexer区块链索

作者: ab6973df9221 | 来源:发表于2018-10-19 10:34 被阅读1次

      兄弟连区块链教程以太坊源码分析chain-indexer区块链索引二。

    Start方法。 这个方法在eth协议启动的时候被调用,这个方法接收两个参数,一个是当前的区块头,一个是事件订阅器,通过这个订阅器可以获取区块链的改变信息。

        eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)

        // Start creates a goroutine to feed chain head events into the indexer for

        // cascading background processing. Children do not need to be started, they

        // are notified about new events by their parents.

    // 子链不需要被启动。 以为他们的父节点会通知他们。

        func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {

            go c.eventLoop(currentHeader, chainEventer)

        }

        // eventLoop is a secondary - optional - event loop of the indexer which is only

        // started for the outermost indexer to push chain head events into a processing

        // queue.

    // eventLoop 循环只会在最外面的索引节点被调用。 所有的Child indexer不会被启动这个方法。

        func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {

            // Mark the chain indexer as active, requiring an additional teardown

            atomic.StoreUint32(&c.active, 1)

            events := make(chan ChainEvent, 10)

            sub := chainEventer(events)

            defer sub.Unsubscribe()

            // Fire the initial new head event to start any outstanding processing

    // 设置我们的其实的区块高度,用来触发之前未完成的操作。

            c.newHead(currentHeader.Number.Uint64(), false)

            var (

                prevHeader = currentHeader

                prevHash = currentHeader.Hash()

            )

            for {

                select {

                case errc := <-c.quit:

                    // Chain indexer terminating, report no failure and abort

                    errc <- nil

                    return

                case ev, ok := <-events:

                    // Received a new event, ensure it's not nil (closing) and update

                    if !ok {

                        errc := <-c.quit

                        errc <- nil

                        return

                    }

                    header := ev.Block.Header()

    if header.ParentHash != prevHash { //如果出现了分叉,那么我们首先

    //找到公共祖先, 从公共祖先之后的索引需要重建。

                        c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)

                    }

    // 设置新的head

                    c.newHead(header.Number.Uint64(), false)

                    prevHeader, prevHash = header, header.Hash()

                }

            }

        }

    newHead方法,通知indexer新的区块链头,或者是需要重建索引,newHead方法会触发

        // newHead notifies the indexer about new chain heads and/or reorgs.

        func (c *ChainIndexer) newHead(head uint64, reorg bool) {

            c.lock.Lock()

            defer c.lock.Unlock()

            // If a reorg happened, invalidate all sections until that point

    if reorg { // 需要重建索引 从head开始的所有section都需要重建。

                // Revert the known section number to the reorg point

                changed := head / c.sectionSize

                if changed < c.knownSections {

                    c.knownSections = changed

                }

                // Revert the stored sections from the database to the reorg point

    // 将存储的部分从数据库恢复到索引重建点

                if changed < c.storedSections {

                    c.setValidSections(changed)

                }

                // Update the new head number to te finalized section end and notify children

    // 生成新的head 并通知所有的子索引

                head = changed * c.sectionSize

                if head < c.cascadedHead {

                    c.cascadedHead = head

                    for _, child := range c.children {

                        child.newHead(c.cascadedHead, true)

                    }

                }

                return

            }

            // No reorg, calculate the number of newly known sections and update if high enough

            var sections uint64

            if head >= c.confirmsReq {

                sections = (head + 1 - c.confirmsReq) / c.sectionSize

                if sections > c.knownSections {

                    c.knownSections = sections

                    select {

                    case c.update <- struct{}{}:

                    default:

                    }

                }

            }

        }

    父子索引数据的关系

    父Indexer负载事件的监听然后把结果通过newHead传递给子Indexer的updateLoop来处理。

    ![image](picture/chainindexer_1.png)

    setValidSections方法,写入当前已经存储的sections的数量。 如果传入的值小于已经存储的数量,那么从数据库里面删除对应的section

        // setValidSections writes the number of valid sections to the index database

        func (c *ChainIndexer) setValidSections(sections uint64) {

            // Set the current number of valid sections in the database

            var data [8]byte

            binary.BigEndian.PutUint64(data[:], sections)

            c.indexDb.Put([]byte("count"), data[:])

            // Remove any reorged sections, caching the valids in the mean time

            for c.storedSections > sections {

                c.storedSections--

                c.removeSectionHead(c.storedSections)

            }

            c.storedSections = sections // needed if new > old

        }

    processSection

        // processSection processes an entire section by calling backend functions while

        // ensuring the continuity of the passed headers. Since the chain mutex is not

        // held while processing, the continuity can be broken by a long reorg, in which

        // case the function returns with an error.

    //processSection通过调用后端函数来处理整个部分,同时确保传递的头文件的连续性。 由于链接互斥锁在处理过程中没有保持,连续性可能会被重新打断,在这种情况下,函数返回一个错误。

        func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {

            c.log.Trace("Processing new chain section", "section", section)

            // Reset and partial processing

            c.backend.Reset(section)

            for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {

                hash := GetCanonicalHash(c.chainDb, number)

                if hash == (common.Hash{}) {

                    return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)

                }

                header := GetHeader(c.chainDb, hash, number)

                if header == nil {

    return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])

                } else if header.ParentHash != lastHead {

                    return common.Hash{}, fmt.Errorf("chain reorged during section processing")

                }

                c.backend.Process(header)

                lastHead = header.Hash()

            }

            if err := c.backend.Commit(); err != nil {

                c.log.Error("Section commit failed", "error", err)

                return common.Hash{}, err

            }

            return lastHead, nil

        }

    相关文章

      网友评论

        本文标题:兄弟连区块链教程以太坊源码分析chain-indexer区块链索

        本文链接:https://www.haomeiwen.com/subject/dncyzftx.html