美文网首页
以太坊挖矿流程

以太坊挖矿流程

作者: hukun | 来源:发表于2018-06-06 21:03 被阅读0次

    以太坊的代码中,名为miner的包负责挖矿的流程。其UML关系图如下图所示:


    image

    整体来说,就是一个矿工miner,拥有一个工人worker,工人拥有N个代理agent。

    miner

    在miner里,有New()函数,在这里初始化成员变量的属性,比如worker,agent。在初始化完成时,就在其它协程里执行update()方法。
    而在update()方法里,它是一个死循环在监听着一个通道,这个通道是来自downloader的一些事件:开始,完成,失败。当下载开始的时候,停止挖矿行为,当下载完成或者失败的时候,执行Start()方法,在Start()方法里,主要调用了worker的start()、commitNewWork()方法。

    func (self *Miner) update() {
        events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
    out:
        for ev := range events.Chan() {
            switch ev.Data.(type) {
            case downloader.StartEvent:
                atomic.StoreInt32(&self.canStart, 0)
                if self.Mining() {
                    self.Stop()
                    atomic.StoreInt32(&self.shouldStart, 1)
                    log.Info("Mining aborted due to sync")
                }
            case downloader.DoneEvent, downloader.FailedEvent:
                shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
    
                atomic.StoreInt32(&self.canStart, 1)
                atomic.StoreInt32(&self.shouldStart, 0)
                if shouldStart {
                    self.Start(self.coinbase)
                }
                // unsubscribe. we're only interested in this event once
                events.Unsubscribe()
                // stop immediately and ignore all further pending events
                break out
            }
        }
    }
    
    func (self *Miner) Start(coinbase common.Address) {
        atomic.StoreInt32(&self.shouldStart, 1)
        self.SetEtherbase(coinbase)
    
        if atomic.LoadInt32(&self.canStart) == 0 {
            log.Info("Network syncing, will start miner afterwards")
            return
        }
        atomic.StoreInt32(&self.mining, 1)
    
        log.Info("Starting mining operation")
        self.worker.start()
        self.worker.commitNewWork()
    }
    

    worker

    在worker.go文件里,在初始化worker结构体的时候,分别开了一个协程去执行update(), wait()两个方法。

    func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
        worker := &worker{
            config:         config,
            engine:         engine,
            eth:            eth,
            mux:            mux,
            txCh:           make(chan core.TxPreEvent, txChanSize),
            chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
            chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
            chainDb:        eth.ChainDb(),
            recv:           make(chan *Result, resultQueueSize),
            chain:          eth.BlockChain(),
            proc:           eth.BlockChain().Validator(),
            possibleUncles: make(map[common.Hash]*types.Block),
            coinbase:       coinbase,
            agents:         make(map[Agent]struct{}),
            unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
        }
        // Subscribe TxPreEvent for tx pool
        worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
        // Subscribe events for blockchain
        worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
        worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
        go worker.update()
    
        go worker.wait()
        worker.commitNewWork()
    
        return worker
    }
    

    这里我们就来主要看下update(), wait()两个方法。

    update()

    
    func (self *worker) update() {
        defer self.txSub.Unsubscribe()
        defer self.chainHeadSub.Unsubscribe()
        defer self.chainSideSub.Unsubscribe()
    
        for {
            // A real event arrived, process interesting content
            select {
            // Handle ChainHeadEvent
            case <-self.chainHeadCh:
                self.commitNewWork()
    
            // Handle ChainSideEvent
            case ev := <-self.chainSideCh:
                self.uncleMu.Lock()
                self.possibleUncles[ev.Block.Hash()] = ev.Block
                self.uncleMu.Unlock()
    
            // Handle TxPreEvent
            case ev := <-self.txCh:
                // Apply transaction to the pending state if we're not mining
                if atomic.LoadInt32(&self.mining) == 0 {
                    self.currentMu.Lock()
                    acc, _ := types.Sender(self.current.signer, ev.Tx)
                    txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
                    txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
    
                    self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
                    self.currentMu.Unlock()
                } else {
                    // If we're mining, but nothing is being processed, wake on new transactions
                    if self.config.Clique != nil && self.config.Clique.Period == 0 {
                        self.commitNewWork()
                    }
                }
    
            // System stopped
            case <-self.txSub.Err():
                return
            case <-self.chainHeadSub.Err():
                return
            case <-self.chainSideSub.Err():
                return
            }
        }
    }
    

    从update()方法中可以看出,它一直在监听几个通道:ChainHeadEvent,ChainSideEvent,TxPreEvent。当接收到了新区块后,立即开始下一块的挖掘。ChainSideEvent指区块链中加入了一个新区块作为当前链的旁支,worker会把这个块放入possibleUncles[]数组里。TxPreEvent是新交易的到来,如果worker当时在挖矿状态中,那就不问这个交易,如果不在挖矿状态中,就把交易加入work.txs数组里面。

    wait()

    
    func (self *worker) wait() {
        for {
            mustCommitNewWork := true
            // 死循环在读取结果
            for result := range self.recv {
                atomic.AddInt32(&self.atWork, -1)
    
                if result == nil {
                    continue
                }
                block := result.Block
                work := result.Work
    
                // Update the block hash in all logs since it is now available and not when the
                // receipt/log of individual transactions were created.
                for _, r := range work.receipts {
                    for _, l := range r.Logs {
                        l.BlockHash = block.Hash()
                    }
                }
                for _, log := range work.state.Logs() {
                    log.BlockHash = block.Hash()
                }
                // 链 写块
                stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
                if err != nil {
                    log.Error("Failed writing block to chain", "err", err)
                    continue
                }
                // check if canon block and write transactions
                if stat == core.CanonStatTy {
                    // implicit by posting ChainHeadEvent
                    mustCommitNewWork = false
                }
                // Broadcast the block and announce chain insertion event
                // 广播块
                self.mux.Post(core.NewMinedBlockEvent{Block: block})
                var (
                    events []interface{}
                    logs   = work.state.Logs()
                )
                events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
                if stat == core.CanonStatTy {
                    events = append(events, core.ChainHeadEvent{Block: block})
                }
                self.chain.PostChainEvents(events, logs)
    
                // Insert the block into the set of pending ones to wait for confirmations
                self.unconfirmed.Insert(block.NumberU64(), block.Hash())
    
                if mustCommitNewWork {
                    self.commitNewWork()
                }
            }
        }
    }
    

    从方法中可以看到,这里依然在不停的从通道 recv 中读取结果。一旦发现一个新块,就把块加入本地的区块链试图成为最新的区块,然后worker会发送一条事件NewMinedBlockEvent{},告诉其他节点我挖到块了。然后就是开始提交新的工作commitNewWork()。

    commitNewWork()

    // 提交新工作
    func (self *worker) commitNewWork() {
        self.mu.Lock()
        defer self.mu.Unlock()
        self.uncleMu.Lock()
        defer self.uncleMu.Unlock()
        self.currentMu.Lock()
        defer self.currentMu.Unlock()
    
        // 当前时间
        tstart := time.Now()
        parent := self.chain.CurrentBlock()
    
        tstamp := tstart.Unix()
        if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
            tstamp = parent.Time().Int64() + 1
        }
        // this will ensure we're not going off too far in the future
        // 检查上一块的时间是否大于本地时间。
        if now := time.Now().Unix(); tstamp > now+1 {
            wait := time.Duration(tstamp-now) * time.Second
            log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
            time.Sleep(wait)
        }
    
        // 创建header结构体,包含了部分属性
        num := parent.Number()
        header := &types.Header{
            ParentHash: parent.Hash(),
            Number:     num.Add(num, common.Big1),
            GasLimit:   core.CalcGasLimit(parent),
            Extra:      self.extra,
            Time:       big.NewInt(tstamp),
        }
        // Only set the coinbase if we are mining (avoid spurious block rewards)
        // 设置Coinbase 
        if atomic.LoadInt32(&self.mining) == 1 {
            header.Coinbase = self.coinbase
        }
        // 设置难度
        if err := self.engine.Prepare(self.chain, header); err != nil {
            log.Error("Failed to prepare header for mining", "err", err)
            return
        }
        // If we are care about TheDAO hard-fork check whether to override the extra-data or not
        if daoBlock := self.config.DAOForkBlock; daoBlock != nil {
            // Check whether the block is among the fork extra-override range
            limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
            if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
                // Depending whether we support or oppose the fork, override differently
                if self.config.DAOForkSupport {
                    header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
                } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
                    header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
                }
            }
        }
        // Could potentially happen if starting to mine in an odd state.
        // 创建当前工作结构体
        err := self.makeCurrent(parent, header)
        if err != nil {
            log.Error("Failed to create mining context", "err", err)
            return
        }
        // Create the current work task and check any fork transitions needed
        work := self.current
        if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
            misc.ApplyDAOHardFork(work.state)
        }
        // 从交易池中取出交易
        pending, err := self.eth.TxPool().Pending()
        if err != nil {
            log.Error("Failed to fetch pending transactions", "err", err)
            return
        }
        txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
        work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
    
        // compute uncles for the new block.
        // 计算叔块
        var (
            uncles    []*types.Header
            badUncles []common.Hash
        )
        for hash, uncle := range self.possibleUncles {
            if len(uncles) == 2 {
                break
            }
            if err := self.commitUncle(work, uncle.Header()); err != nil {
                log.Trace("Bad uncle found and will be removed", "hash", hash)
                log.Trace(fmt.Sprint(uncle))
    
                badUncles = append(badUncles, hash)
            } else {
                log.Debug("Committing new uncle to block", "hash", hash)
                uncles = append(uncles, uncle.Header())
            }
        }
        for _, hash := range badUncles {
            delete(self.possibleUncles, hash)
        }
        // Create the new block to seal with the consensus engine
            // 补充header的Root,TxHash,ReceiptHash,UncleHash等值
        if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
            log.Error("Failed to finalize block for sealing", "err", err)
            return
        }
        // We only care about logging if we're actually mining.
        if atomic.LoadInt32(&self.mining) == 1 {
            log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
            self.unconfirmed.Shift(work.Block.NumberU64() - 1)
        }
        self.push(work) // 推给每个agent继续进行挖掘。
    }
    
    

    从方法中,这里构造了新区块头的各种属性,然后推给所有的代理agent。

    CpuAgent

    cpu代理是真正的挖矿执行者,而它挖矿的方法 mine()调用的却是共识引擎里面的Seal()方法。

    func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
        if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
            log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
            self.returnCh <- &Result{work, result}
        } else {
            if err != nil {
                log.Warn("Block sealing failed", "err", err)
            }
            self.returnCh <- nil
        }
    }
    

    ethash/sealer.go

    // Seal implements consensus.Engine, attempting to find a nonce that satisfies
    // the block's difficulty requirements.
    func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) {
        // If we're running a fake PoW, simply return a 0 nonce immediately
        if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake {
            header := block.Header()
            header.Nonce, header.MixDigest = types.BlockNonce{}, common.Hash{}
            return block.WithSeal(header), nil
        }
        // If we're running a shared PoW, delegate sealing to it
        if ethash.shared != nil {
            return ethash.shared.Seal(chain, block, stop)
        }
        // Create a runner and the multiple search threads it directs
        abort := make(chan struct{})
        found := make(chan *types.Block)
    
        ethash.lock.Lock()
        threads := ethash.threads
        if ethash.rand == nil {
            seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
            if err != nil {
                ethash.lock.Unlock()
                return nil, err
            }
            ethash.rand = rand.New(rand.NewSource(seed.Int64()))
        }
        ethash.lock.Unlock()
        // 查看当前开启了多少协程
        if threads == 0 {
            threads = runtime.NumCPU()
        }
        if threads < 0 {
            threads = 0 // Allows disabling local mining without extra logic around local/remote
        }
        var pend sync.WaitGroup
        // 根据协程数,去挖矿,每个协程一个随机数作为起点。
        for i := 0; i < threads; i++ {
            pend.Add(1)
            go func(id int, nonce uint64) {
                defer pend.Done()
                ethash.mine(block, id, nonce, abort, found)
            }(i, uint64(ethash.rand.Int63()))
        }
        // Wait until sealing is terminated or a nonce is found
        var result *types.Block
        select {
        case <-stop:
            // Outside abort, stop all miner threads
            close(abort)
        case result = <-found:
            // One of the threads found a block, abort all others
            close(abort)
        case <-ethash.update:
            // Thread count was changed on user request, restart
            close(abort)
            pend.Wait()
            return ethash.Seal(chain, block, stop)
        }
        // Wait for all miners to terminate and return the block
        pend.Wait()
        return result, nil
    }
    
    // mine is the actual proof-of-work miner that searches for a nonce starting from
    // seed that results in correct final block difficulty.
    func (ethash *Ethash) mine(block *types.Block, id int, seed uint64, abort chan struct{}, found chan *types.Block) {
        // Extract some data from the header
        // 块的属性
        var (
            header  = block.Header()
            hash    = header.HashNoNonce().Bytes()
            target  = new(big.Int).Div(maxUint256, header.Difficulty)
            number  = header.Number.Uint64()
            dataset = ethash.dataset(number)
        )
        // Start generating random nonces until we abort or find a good one
        var (
            attempts = int64(0)
            nonce    = seed
        )
        logger := log.New("miner", id)
        logger.Trace("Started ethash search for new nonces", "seed", seed)
    search:
        for {
            select {
            case <-abort:
                // Mining terminated, update stats and abort
                logger.Trace("Ethash nonce search aborted", "attempts", nonce-seed)
                ethash.hashrate.Mark(attempts)
                break search
    
            default:
                // We don't have to update hash rate on every nonce, so update after after 2^X nonces
                // 这里在计数,计算hashrate。
    
                attempts++
                if (attempts % (1 << 15)) == 0 {
                    ethash.hashrate.Mark(attempts)
                    attempts = 0
                }
                // Compute the PoW value of this nonce
                // 去计算结果
                digest, result := hashimotoFull(dataset.dataset, hash, nonce)
                if new(big.Int).SetBytes(result).Cmp(target) <= 0 {
                    // Correct nonce found, create a new header with it
                    // 满足要求,把算出的nonce与mixDigest赋值给区块头
                    header = types.CopyHeader(header)
                    header.Nonce = types.EncodeNonce(nonce)
                    header.MixDigest = common.BytesToHash(digest)
    
                    // Seal and return a block (if still needed)
                    select {
                    case found <- block.WithSeal(header):
                        logger.Trace("Ethash nonce found and reported", "attempts", nonce-seed, "nonce", nonce)
                    case <-abort:
                        logger.Trace("Ethash nonce found but discarded", "attempts", nonce-seed, "nonce", nonce)
                    }
                    break search
                }
                nonce++
            }
        }
        // Datasets are unmapped in a finalizer. Ensure that the dataset stays live
        // during sealing so it's not unmapped while being read.
        runtime.KeepAlive(dataset)
    }
    

    相关文章

      网友评论

          本文标题:以太坊挖矿流程

          本文链接:https://www.haomeiwen.com/subject/opsisftx.html