美文网首页以太坊 ethereum
以太坊 miner worker 导读

以太坊 miner worker 导读

作者: walker_1992 | 来源:发表于2021-08-29 20:04 被阅读0次

    代码版本:1.8.27,此版本的Miner模块有较大改变,取消了原来的agent模块以及work对象,但是基本逻辑还是一样的。Miner模块的主要执行部分在worker中,Miner对象及其方法主要控制着模块的开关和外部接口。

    Miner模块

    // Miner creates blocks and searches for proof-of-work values.
    type Miner struct {
        mux      *event.TypeMux
        worker   *worker
        coinbase common.Address
        eth      Backend
        engine   consensus.Engine
        exitCh   chan struct{}
        startCh  chan common.Address
        stopCh   chan struct{}
    }
    

    miner.update()方法

    监听downloader事件,控制着canStart和shouldStart这两个开关,用于抵挡DOS攻击。

    1、当监听到downloader的StartEvent事件时,canStart设置为0,表示downloader同步时不可进行挖矿,如果正在挖矿(miner.mining == ture),停止挖矿,同时将shouldStart设置为1,以便下次直接开始挖矿;

    2、当监听到downloader的DoneEvent事件或者FailedEvent事件,判断shouldStart是否打开。如果是打开的,则再打开canStart,将shouldStart关闭。此时,将挖矿的控制权完全交给miner.Start()方法。

    miner.start()方法很简单,打开shouldstart,设置coinbase,然后启动worker

    func (miner *Miner) Start(coinbase common.Address) {
        miner.startCh <- coinbase
    }
    
    func (miner *Miner) update() {
    ...
            case addr := <-miner.startCh:
                miner.SetEtherbase(addr)
                if canStart {
                    miner.worker.start()
                }
                shouldStart = true
    ...
    }
    

    Worker模块

    下图是miner主要的流程图,清晰的说明了worker的工作原理

    miner

    Worker的数据结构如下,比较重要的已经注释:

    // worker is the main object which takes care of submitting new work to consensus engine
    // and gathering the sealing result.
    type worker struct {
        config      *Config
        chainConfig *params.ChainConfig
        engine      consensus.Engine //共识引擎
        eth         Backend                    //以太坊终端
        chain       *core.BlockChain   //区块链对象
    
        // Feeds
        pendingLogsFeed event.Feed
    
        // Subscriptions
        mux          *event.TypeMux
        txsCh        chan core.NewTxsEvent //交易池更新事件
        txsSub       event.Subscription
        chainHeadCh  chan core.ChainHeadEvent //区块头更新事件
        chainHeadSub event.Subscription
        chainSideCh  chan core.ChainSideEvent //区块链分叉事件
        chainSideSub event.Subscription
    
        // Channels
        newWorkCh          chan *newWorkReq
        taskCh             chan *task
        resultCh           chan *types.Block
        startCh            chan struct{}
        exitCh             chan struct{}
        resubmitIntervalCh chan time.Duration
        resubmitAdjustCh   chan *intervalAdjust
    
        current      *environment                 // An environment for current running cycle.当前挖矿生命周期的执行环境
        localUncles  map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. 本地分叉区块作为潜在叔块
        remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. 分叉区块链中潜在的叔块
        unconfirmed  *unconfirmedBlocks           // A set of locally mined blocks pending canonicalness confirmations. 本地产生但尚未被确认的区块
    
        mu       sync.RWMutex // The lock used to protect the coinbase and extra fields
        coinbase common.Address
        extra    []byte
    
        pendingMu    sync.RWMutex
        pendingTasks map[common.Hash]*task //挖矿任务map
    
        snapshotMu       sync.RWMutex // The lock used to protect the snapshots below
        snapshotBlock    *types.Block//快照的区块
        snapshotReceipts types.Receipts
        snapshotState    *state.StateDB//快照的状态
    
        // atomic status counters
        running int32 // The indicator whether the consensus engine is running or not. 判断引擎是否启动
        newTxs  int32 // New arrival transaction count since last sealing work submitting. 记录上次递交任务后新来的区块数量
    
        // noempty is the flag used to control whether the feature of pre-seal empty
        // block is enabled. The default value is false(pre-seal is enabled by default).
        // But in some special scenario the consensus engine will seal blocks instantaneously,
        // in this case this feature will add all empty blocks into canonical chain
        // non-stop and no real transaction will be included.
        noempty uint32
    
        // External functions
        isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.
    
        // Test hooks
        newTaskHook  func(*task)                        // Method to call upon receiving a new sealing task.
        skipSealHook func(*task) bool                   // Method to decide whether skipping the sealing.
        fullTaskHook func()                             // Method to call before pushing the full sealing task.
        resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
    }
    

    在初始化miner的时候,会新建worker,调用newWorker( ),该方法首先配置了worker对象,然后订阅了交易池事件、规范链更新事件和分叉事件,启动4个goroutine,最后通过向startCh中传入一个struct{}{},直接进入newWorkerLoop的逻辑。

    func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
        worker := &worker{
            config:             config,
            chainConfig:        chainConfig,
            engine:             engine,
            eth:                eth,
            mux:                mux,
            chain:              eth.BlockChain(),
            isLocalBlock:       isLocalBlock,
            localUncles:        make(map[common.Hash]*types.Block),
            remoteUncles:       make(map[common.Hash]*types.Block),
            unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
            pendingTasks:       make(map[common.Hash]*task),
            txsCh:              make(chan core.NewTxsEvent, txChanSize),
            chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
            chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
            newWorkCh:          make(chan *newWorkReq),
            taskCh:             make(chan *task),
            resultCh:           make(chan *types.Block, resultQueueSize),
            exitCh:             make(chan struct{}),
            startCh:            make(chan struct{}, 1),
            resubmitIntervalCh: make(chan time.Duration),
            resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
        }
        // Subscribe NewTxsEvent for tx pool
        worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
        // Subscribe events for blockchain
        worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
        worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
    
        // Sanitize recommit interval if the user-specified one is too short.
        recommit := worker.config.Recommit
        if recommit < minRecommitInterval {
            log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
            recommit = minRecommitInterval
        }
    
        go worker.mainLoop()
        go worker.newWorkLoop(recommit)
        go worker.resultLoop()
        go worker.taskLoop()
    
        // Submit first work to initialize pending state.
        if init {
            worker.startCh <- struct{}{}
        }
        return worker
    }
    

    NewWorkLoop

    newWorkLoop主要监听两个重要的通道,一个是startCh通道,一个是chainHeadCh,这两个通道均用于清理特定父区块的pengding tasks列表,然后递交基于父区块的挖矿task。区别在于startCh通道启动是基于当前的currentBlock,而chainHeadCh是基于新传来的区块头。

    // newWorkLoop is a standalone goroutine to submit new mining work upon received events.
    func (w *worker) newWorkLoop(recommit time.Duration) {
    ...
        for {
            select {
            case <-w.startCh:
                clearPending(w.chain.CurrentBlock().NumberU64())
                timestamp = time.Now().Unix()
                commit(false, commitInterruptNewHead)
    
            case head := <-w.chainHeadCh:
                clearPending(head.Block.NumberU64())
                timestamp = time.Now().Unix()
                commit(false, commitInterruptNewHead)
    
            case <-timer.C: ...
    
            case interval := <-w.resubmitIntervalCh: ...
    
            case adjust := <-w.resubmitAdjustCh: ...
    
            case <-w.exitCh:
                return
            }
        }
    }
    
    

    清理残留挖矿任务后,构建新的挖矿任务,这时候调用commit函数,构建一个newWorkReq对象,传入newWorkCh通道,进入MainLoop协程。MainLoop()监听三个重要的通道,newWorkCh(新work请求通道)、txsCh(交易池更新事件通道)以及chainSideCh(区块链分叉事件通道)

    MainLoop

    // mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
    func (w *worker) mainLoop() {
        defer w.txsSub.Unsubscribe()
        defer w.chainHeadSub.Unsubscribe()
        defer w.chainSideSub.Unsubscribe()
    
        for {
            select {
                    //task1:直接启动commitNetwork,  进一步提交挖矿task
            case req := <-w.newWorkCh:
                w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
                    // task2:出现分叉后,处理叔块
            case ev := <-w.chainSideCh:
                // Short circuit for duplicate side blocks 检验该hash的区块是否已经被当作潜在叔块,如果是,则忽略
                if _, exist := w.localUncles[ev.Block.Hash()]; exist {
                    continue
                }
                if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
                    continue
                }
                // Add side block to possible uncle block set depending on the author.
                            //将该区块作为潜在叔块加入叔块map,key为该区块的矿工地址
                if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
                    w.localUncles[ev.Block.Hash()] = ev.Block
                } else {
                    w.remoteUncles[ev.Block.Hash()] = ev.Block
                }
                // If our mining block contains less than 2 uncle blocks,
                // add the new uncle block if valid and regenerate a mining block.
                            // 如果我们正在mining的区块少于两个叔块,则添加新的叔块并从新生成mining blocks
                if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
    ...
                }
                    //task3: 交易池更新后
            case ev := <-w.txsCh: ...
    
            // System stopped
            case <-w.exitCh:
                return
            case <-w.txsSub.Err():
                return
            case <-w.chainHeadSub.Err():
                return
            case <-w.chainSideSub.Err():
                return
            }
        }
    }
    

    接着上面的的流程,newWorkCh通道传出req后,直接启动commitNewWork()函数。

    commitNewWork()

    方法主要的功能是递交一个新的task:

    1. 初始化一个新区块头给待挖矿的区块;
    2. 为当前挖矿周期初始化一个工作环境work;
    3. 获取交易池中每个账户地址的交易列表中的第一个交易后排序,然后应用这些交易;
    4. 获取两个叔块;
    5. 将区块递交给commit,用于生成task;
    6. 更新状态快照,供前端查询;

    最后是commit方法计算挖矿奖励,更新block,将上面生成的block递交到一个挖矿task,最后将task传入taskCh通道。

    // commit runs any post-transaction state modifications, assembles the final block
    // and commits new work if consensus engine is running.
    func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
        // Deep copy receipts here to avoid interaction between different tasks.
        receipts := copyReceipts(w.current.receipts)
        s := w.current.state.Copy()
            // 计算挖矿奖励(包括叔块奖励)
        block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
        if err != nil {
            return err
        }
        if w.isRunning() {
            if interval != nil {
                interval()
            }
            select {
                    // 生成task,传入taskCh通道:
            case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
                w.unconfirmed.Shift(block.NumberU64() - 1)
    ...
            case <-w.exitCh:
                log.Info("Worker has exited")
            }
        }
        if update {
            w.updateSnapshot()
        }
        return nil
    }
    

    TaskLoop

    task进入taskLoop后,被加入pendingTasks列表:

        for {
            select {
            case task := <-w.taskCh:
                if w.newTaskHook != nil {
                    w.newTaskHook(task)
                }
                // Reject duplicate sealing work due to resubmitting.
                            // 计算header数据的RLP hash值,判断是否有相同的块已经在挖矿中了,如果是则放弃,否则终止之前的挖矿
                sealHash := w.engine.SealHash(task.block.Header())
                if sealHash == prev {
                    continue
                }
                // Interrupt previous sealing operation
                interrupt()
                stopCh, prev = make(chan struct{}), sealHash
    
                if w.skipSealHook != nil && w.skipSealHook(task) {
                    continue
                }
                w.pendingMu.Lock()
                w.pendingTasks[sealHash] = task
                w.pendingMu.Unlock()
                            // 最后执行挖矿,结果会通过resuletCh传入resultLoop
                if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
                    log.Warn("Block sealing failed", "err", err)
                }
            case <-w.exitCh:
                interrupt()
                return
            }
        }
    
    

    resultLoop

    最后是resultLoop,挖矿结果传入resultLoop,先从pengdingTasks列表中取出刚执行挖矿的task,更新收据日志中的blockHash,然后将区块存入数据库,最后将区块广播出去。

    commitTransaction() 交易执行

    1. 设置gaspool:
    2. 进入交易执行循环

    在for循环中,会有三种情况会被打断:a、交易还在执行,但是新的区块已经经过广播到达本地,interrupt信号为1;b、worker start 或者restart,interrupt信号为1;c、worker重新构造区块,包含了新到的交易,interrupt信号为2。

    对于前两种,worker的本次执行就终止,当对于第三种情况,本次执行依然会被提交到consensus engine

    1. 如果区块工作环境剩余gas小于21000,则推出循环,否则从排好序的列表离取出交易;
    2. 执行交易并处理错误:w.commitTransaction()
            // Start executing the transaction
                    // 首先准备当前的世界状态
            w.current.state.Prepare(tx.Hash(), w.current.tcount)
                    // 调用交易执行的方法,core.ApplyTransaction,得到收据并放入当前的执行环境
            logs, err := w.commitTransaction(tx, coinbase)
            switch {
            case errors.Is(err, core.ErrGasLimitReached):
                            // gasPool不够执行交易,则当前交易从trxs中移除
                // Pop the current out-of-gas transaction without shifting in the next from the account
                log.Trace("Gas limit exceeded for current block", "sender", from)
                txs.Pop()
    
            case errors.Is(err, core.ErrNonceTooLow):
                            // 交易nonce太低,则取下一个交易替换处理列表中的第一个交易
                // New head notification data race between the transaction pool and miner, shift
                log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
                txs.Shift()
    
            case errors.Is(err, core.ErrNonceTooHigh):
                            // 交易nonce太高,则将当前交易从trxs列表中移除
                // Reorg notification data race between the transaction pool and miner, skip account =
                log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
                txs.Pop()
    
            case errors.Is(err, nil):
                            // 一切正常,收集日志,统计执行成功的交易计数
                // Everything ok, collect the logs and shift in the next transaction from the same account
                coalescedLogs = append(coalescedLogs, logs...)
                w.current.tcount++
                txs.Shift()
    
            case errors.Is(err, core.ErrTxTypeNotSupported):
                // Pop the unsupported transaction without shifting in the next from the account
                log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
                txs.Pop()
    
            default:
                // Strange error, discard the transaction and get the next in line (note, the
                // nonce-too-high clause will prevent us from executing in vain).
                log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
                txs.Shift()
            }
    

    Core.ApplyTransaction 执行交易的入口,将交易送入太坊虚拟机执行

    image

    ApplyTransaction函数

    该函数的调用有两种情况:

    1. 是在将区块插入区块链前需要验证区块合法性
      bc.insertChain-->bc.processor.Process-->stateProcessor.Process -->ApplyTransaction

    2. 是worker挖矿过程中执行交易时
      Worker.commitTransaction ——> ApplyTransaction

    主要功能是:将交易转化成Message,创建EVM对象,调用ApplyMessage执行交易,生成日志对象;

    1. 将交易转换成Message;
    2. 初始化一个EVM的执行环境;
    3. 执行交易,改变stateDB世界状态,然后生成收据;
    func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) {
        // Create a new context to be used in the EVM environment.
        txContext := NewEVMTxContext(msg)
        evm.Reset(txContext, statedb)
    
        // Apply the transaction to the current state (included in the env).
        result, err := ApplyMessage(evm, msg, gp)
        if err != nil {
            return nil, err
        }
    
        // Update the state with pending changes.
        var root []byte
        if config.IsByzantium(blockNumber) {
            statedb.Finalise(true)
        } else {
            root = statedb.IntermediateRoot(config.IsEIP158(blockNumber)).Bytes()
        }
        *usedGas += result.UsedGas
    
        // Create a new receipt for the transaction, storing the intermediate root and gas used
        // by the tx.
        receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
        if result.Failed() {
            receipt.Status = types.ReceiptStatusFailed
        } else {
            receipt.Status = types.ReceiptStatusSuccessful
        }
        receipt.TxHash = tx.Hash()
        receipt.GasUsed = result.UsedGas
    
        // If the transaction created a contract, store the creation address in the receipt.
        if msg.To() == nil {
            receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
        }
    
        // Set the receipt logs and create the bloom filter.
        receipt.Logs = statedb.GetLogs(tx.Hash(), blockHash)
        receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
        receipt.BlockHash = blockHash
        receipt.BlockNumber = blockNumber
        receipt.TransactionIndex = uint(statedb.TxIndex())
        return receipt, err
    }
    

    ethash 挖矿

    POW的本质是基于算力解决一个数学上困难的问题,解决问题的关键点除了暴力枚举,没有任何办法可以找到我们所需要的nonce值,但对于验证输出的结果是非常简单容易的。

    经典的比特币POW的算法原理是对block的header加上循环更新的nonce去进行hash运算,运算的target是hash值的前n位为0,这个计算只能通过暴力枚举来进行,验证也很容易,只要使用最终的nonce打入header按照之前的算法验证即可。

    以太坊采用的ethash算法与比特币不同,但基本类似,都是找到一个nonce值输入到算法中,得到的结果低于一个基于特定困难值的阈值。

    RAND(h,n) <= M/d
    
    RAND 是一系列复杂的运算
    h:header 不饱和nonce
    n:nonce
    
    M:2^256
    d: 难度值 ,该难度值给予父区块的时间戳和难度而得到
    

    如上所示,我们用header和nonce经过复杂的计算,如果得到的结果小于或者等于M/d,该nonce就是可用的,意味着挖矿成功。

    下图是ethash算法在以太坊源码中的实现


    aa

    Eth.Seal方法

    主要任务是:

    1. 获得种子seed;
    2. 基于seed获得Rand对象,rand值将作为初始化nonce进行挖矿;
    3. 启动mine方法,执行挖矿;
    // Seal implements consensus.Engine, attempting to find a nonce that satisfies
    // the block's difficulty requirements.
    func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
        // If we're running a fake PoW, simply return a 0 nonce immediately
        if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake {
            header := block.Header()
            header.Nonce, header.MixDigest = types.BlockNonce{}, common.Hash{}
            select {
            case results <- block.WithSeal(header):
            default:
                ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "fake", "sealhash", ethash.SealHash(block.Header()))
            }
            return nil
        }
        // If we're running a shared PoW, delegate sealing to it
        if ethash.shared != nil {
            return ethash.shared.Seal(chain, block, results, stop)
        }
        // Create a runner and the multiple search threads it directs
        abort := make(chan struct{})
    
        ethash.lock.Lock()
            // 挖矿的线程
        threads := ethash.threads
        if ethash.rand == nil {
            seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
            if err != nil {
                ethash.lock.Unlock()
                return err
            }
                    // 生成rand对象
            ethash.rand = rand.New(rand.NewSource(seed.Int64()))
        }
        ethash.lock.Unlock()
        if threads == 0 {
            threads = runtime.NumCPU()
        }
        if threads < 0 {
            threads = 0 // Allows disabling local mining without extra logic around local/remote
        }
        // Push new work to remote sealer
        if ethash.remote != nil {
            ethash.remote.workCh <- &sealTask{block: block, results: results}
        }
        var (
            pend   sync.WaitGroup
            locals = make(chan *types.Block)
        )
            // 闭包多线程处理
        for i := 0; i < threads; i++ {
            pend.Add(1)
            go func(id int, nonce uint64) {
                defer pend.Done()
                ethash.mine(block, id, nonce, abort, locals)
            }(i, uint64(ethash.rand.Int63()))
        }
        // Wait until sealing is terminated or a nonce is found
        go func() {
            var result *types.Block
            select {
            case <-stop:
                // Outside abort, stop all miner threads
                close(abort)
            case result = <-locals:
                // One of the threads found a block, abort all others
                select {
                case results <- result:
                default:
                    ethash.config.Log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header()))
                }
                close(abort)
            case <-ethash.update:
                // Thread count was changed on user request, restart
                close(abort)
                if err := ethash.Seal(chain, block, results, stop); err != nil {
                    ethash.config.Log.Error("Failed to restart sealing after update", "err", err)
                }
            }
            // Wait for all miners to terminate and return the block
            pend.Wait()
        }()
        return nil
    }
    

    mine方法

    主要任务:

    1. 取出block的header;
    2. 取出没有nonce时的区块hash;
    3. 设置目标target,M/td;
    4. 获得dataset数据集;
    5. 开启无限循环,计算每一轮的nonce值的POW结果,直到获得满足条件的解;

    补充:DAG和epoch

    1. 上面的dataset就来自内存中的一组数据或者硬盘里的DAG。
    2. DAG是有向无环图,以太坊的DAG是基于区块高度生成的。
    3. 以太坊中每3万个块会生成一代DAG,这一代就成称为一个epoch。
    4. 挖矿的时候需要从DAG中随机选取dataset,所以挖矿工作只能在现世DAG创建以后才能开始。

    参考:
    1.以太坊源码解读
    2.go-ethereum-code-analysis

    相关文章

      网友评论

        本文标题:以太坊 miner worker 导读

        本文链接:https://www.haomeiwen.com/subject/nowviltx.html