美文网首页
以太坊原理分析系列4---挖矿模块

以太坊原理分析系列4---挖矿模块

作者: JC86 | 来源:发表于2019-07-16 17:51 被阅读0次

    先上挖矿流程图


    image.png

    miner、worker和agent是主要工作部件,关系如下

    image.png

    类图


    image.png

    Miner的定义如下

    //Miner创建块并搜索工作证明值。
    type Miner struct {
        mux      *event.TypeMux //接收来自downloader模块的StartEvent DoneEvent FailedEvent事件通知。在网络中,不可能只有一个矿工节点,当downloader开始从其他节点同步Block时,我们就没有必要再继续挖矿了
        worker   *worker //对应的worker,从这里看出Miner和worker是一一对应的
        coinbase common.Address //本矿工的账户地址,挖矿所得的收入将计入该账户
        eth      Backend //通过该接口可查询后台TxPool BlockChain ethdb的数据.举例来说,作为矿工,我们在生成一个新的Block时需要从TxPool中取出pending Tx(待打包成块的交易),然后将它们中的一部分作为新的Block中的Transaction
        engine   consensus.Engine //采用的共识引擎,目前以太坊公网采用的是ethash,测试网络采用clique
        exitCh   chan struct{}
    
    canStart    int32 //can start指示是否可以启动挖掘操作
    shouldStart int32 //should start指示是否应在同步后启动
    }
    

    看看他new一个miner的时候做了些什么

    func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner {
        miner := &Miner{
            eth:      eth,
            mux:      mux,
            engine:   engine,
            worker:   newWorker(config, engine, common.Address{}, eth, mux),
            canStart: 1,
        }
        miner.Register(NewCpuAgent(eth.BlockChain(), engine))
        go miner.update()
    
        return miner
    }
    

    首先初始化了结构变量,然后还做了一下动作:
    1、创建一个worker实例
    2、使用miner.newCpuAgent()创建Agent 并用Register方法注册给worker
    3、启动miner.update() 线程.该线程等待mux上的来自 downloader模块的事件通知用来控制挖矿开始或停止

    func (self *Miner) Register(agent Agent) {
        if self.Mining() {
            agent.Start()
        }
        self.worker.register(agent)
    }
    
    func (self *worker) register(agent Agent) {
        self.mu.Lock()
        defer self.mu.Unlock()
        self.agents[agent] = struct{}{}
        agent.SetReturnCh(self.recv)
    }
    
    // worker is the main object which takes care of applying messages to the new state
    type worker struct {
        config *params.ChainConfig
        engine consensus.Engine
    
        mu sync.Mutex
    
        // update loop
        mux          *event.TypeMux
        txCh         chan core.TxPreEvent
        txSub        event.Subscription
        chainHeadCh  chan core.ChainHeadEvent
        chainHeadSub event.Subscription
        chainSideCh  chan core.ChainSideEvent
        chainSideSub event.Subscription
        wg           sync.WaitGroup
    
        agents map[Agent]struct{}
        recv   chan *Result
    
        eth     Backend
        chain   *core.BlockChain
        proc    core.Validator
        chainDb ethdb.Database
    
        coinbase common.Address
        extra    []byte
    
        currentMu sync.Mutex
        current   *Work
    
        uncleMu        sync.Mutex
        possibleUncles map[common.Hash]*types.Block
    
        unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
    
        // atomic status counters
        mining int32
        atWork int32
    }
    
    func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
        worker := &worker{
            config:         config,
            engine:         engine,
            eth:            eth,
            mux:            mux,
            txCh:           make(chan core.TxPreEvent, txChanSize),
            chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
            chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
            chainDb:        eth.ChainDb(),
            recv:           make(chan *Result, resultQueueSize),
            chain:          eth.BlockChain(),
            proc:           eth.BlockChain().Validator(),
            possibleUncles: make(map[common.Hash]*types.Block),
            coinbase:       coinbase,
            agents:         make(map[Agent]struct{}),
            unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
        }
        // Subscribe TxPreEvent for tx pool
        worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
        // Subscribe events for blockchain
        worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
        worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
        go worker.update()
    
        go worker.wait()
        worker.commitNewWork()
    
        return worker
    }
    
    // Agent can register themself with the worker
    type Agent interface {
        Work() chan<- *Work
        SetReturnCh(chan<- *Result)
        Stop()
        Start()
        GetHashRate() int64
    }
    
    func (self *worker) commitNewWork() {
        self.mu.Lock()
        defer self.mu.Unlock()
        self.uncleMu.Lock()
        defer self.uncleMu.Unlock()
        self.currentMu.Lock()
        defer self.currentMu.Unlock()
    
        tstart := time.Now()
        parent := self.chain.CurrentBlock()
    
        tstamp := tstart.Unix()
        if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
            tstamp = parent.Time().Int64() + 1
        }
        // this will ensure we're not going off too far in the future
        if now := time.Now().Unix(); tstamp > now+1 {
            wait := time.Duration(tstamp-now) * time.Second
            log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
            time.Sleep(wait)
        }
    
        num := parent.Number()
        header := &types.Header{
            ParentHash: parent.Hash(),
            Number:     num.Add(num, common.Big1),
            GasLimit:   core.CalcGasLimit(parent),
            Extra:      self.extra,
            Time:       big.NewInt(tstamp),
        }
        // Only set the coinbase if we are mining (avoid spurious block rewards)
        if atomic.LoadInt32(&self.mining) == 1 {
            header.Coinbase = self.coinbase
        }
        if err := self.engine.Prepare(self.chain, header); err != nil {
            log.Error("Failed to prepare header for mining", "err", err)
            return
        }
        // If we are care about TheDAO hard-fork check whether to override the extra-data or not
        if daoBlock := self.config.DAOForkBlock; daoBlock != nil {
            // Check whether the block is among the fork extra-override range
            limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
            if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
                // Depending whether we support or oppose the fork, override differently
                if self.config.DAOForkSupport {
                    header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
                } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
                    header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
                }
            }
        }
        // Could potentially happen if starting to mine in an odd state.
        err := self.makeCurrent(parent, header)
        if err != nil {
            log.Error("Failed to create mining context", "err", err)
            return
        }
        // Create the current work task and check any fork transitions needed
        work := self.current
        if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
            misc.ApplyDAOHardFork(work.state)
        }
        pending, err := self.eth.TxPool().Pending()
        if err != nil {
            log.Error("Failed to fetch pending transactions", "err", err)
            return
        }
        txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
        work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
    
        // compute uncles for the new block.
        var (
            uncles    []*types.Header
            badUncles []common.Hash
        )
        for hash, uncle := range self.possibleUncles {
            if len(uncles) == 2 {
                break
            }
            if err := self.commitUncle(work, uncle.Header()); err != nil {
                log.Trace("Bad uncle found and will be removed", "hash", hash)
                log.Trace(fmt.Sprint(uncle))
    
                badUncles = append(badUncles, hash)
            } else {
                log.Debug("Committing new uncle to block", "hash", hash)
                uncles = append(uncles, uncle.Header())
            }
        }
        for _, hash := range badUncles {
            delete(self.possibleUncles, hash)
        }
        // Create the new block to seal with the consensus engine
        if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
            log.Error("Failed to finalize block for sealing", "err", err)
            return
        }
        // We only care about logging if we're actually mining.
        if atomic.LoadInt32(&self.mining) == 1 {
            log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
            self.unconfirmed.Shift(work.Block.NumberU64() - 1)
        }
        self.push(work)
    }
    
    func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
        gp := new(core.GasPool).AddGas(env.header.GasLimit)
    
        var coalescedLogs []*types.Log
    
        for {
            // If we don't have enough gas for any further transactions then we're done
            if gp.Gas() < params.TxGas {
                log.Trace("Not enough gas for further transactions", "gp", gp)
                break
            }
            // Retrieve the next transaction and abort if all done
            tx := txs.Peek()
            if tx == nil {
                break
            }
            // Error may be ignored here. The error has already been checked
            // during transaction acceptance is the transaction pool.
            //
            // We use the eip155 signer regardless of the current hf.
            from, _ := types.Sender(env.signer, tx)
            // Check whether the tx is replay protected. If we're not in the EIP155 hf
            // phase, start ignoring the sender until we do.
            if tx.Protected() && !env.config.IsEIP155(env.header.Number) {
                log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block)
    
                txs.Pop()
                continue
            }
            // Start executing the transaction
            env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
    
            err, logs := env.commitTransaction(tx, bc, coinbase, gp)
            switch err {
            case core.ErrGasLimitReached:
                // Pop the current out-of-gas transaction without shifting in the next from the account
                log.Trace("Gas limit exceeded for current block", "sender", from)
                txs.Pop()
    
            case core.ErrNonceTooLow:
                // New head notification data race between the transaction pool and miner, shift
                log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
                txs.Shift()
    
            case core.ErrNonceTooHigh:
                // Reorg notification data race between the transaction pool and miner, skip account =
                log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
                txs.Pop()
    
            case nil:
                // Everything ok, collect the logs and shift in the next transaction from the same account
                coalescedLogs = append(coalescedLogs, logs...)
                env.tcount++
                txs.Shift()
    
            default:
                // Strange error, discard the transaction and get the next in line (note, the
                // nonce-too-high clause will prevent us from executing in vain).
                log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
                txs.Shift()
            }
        }
    
        if len(coalescedLogs) > 0 || env.tcount > 0 {
            // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
            // logs by filling in the block hash when the block was mined by the local miner. This can
            // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
            cpy := make([]*types.Log, len(coalescedLogs))
            for i, l := range coalescedLogs {
                cpy[i] = new(types.Log)
                *cpy[i] = *l
            }
            go func(logs []*types.Log, tcount int) {
                if len(logs) > 0 {
                    mux.Post(core.PendingLogsEvent{Logs: logs})
                }
                if tcount > 0 {
                    mux.Post(core.PendingStateEvent{})
                }
            }(cpy, env.tcount)
        }
    }
    
    

    参考
    [以太坊源代码分析]III. 挖矿和共识算法的奥秘
    以太坊源码分析--挖矿与共识
    以太坊挖矿源码分析

    https://github.com/ZtesoftCS/go-ethereum-code-analysis

    相关文章

      网友评论

          本文标题:以太坊原理分析系列4---挖矿模块

          本文链接:https://www.haomeiwen.com/subject/djhpkctx.html