美文网首页以太坊
以太坊交易池源码分析

以太坊交易池源码分析

作者: mindcarver | 来源:发表于2020-10-15 19:21 被阅读0次

    交易池概念原理

    交易池工作概况:

    <img src="https://tva1.sinaimg.cn/large/007S8ZIlgy1gjnz8kgakqj30ya0togqg.jpg" alt="image-20201013202559291" style="zoom:50%;" />

    1. 交易池的数据来源主要来自:
      • 本地提交,也就是第三方应用通过调用本地以太坊节点的RPC服务所提交的交易;
      • 远程同步,是指通过广播同步的形式,将其他以太坊节点的交易数据同步至本地节点;
    2. 交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播
    3. Miner取走交易是复制,交易池中的交易并不减少。直到交易被写进规范链后才从交易池删除;
    4. 交易如果被写进分叉,交易池中的交易也不减少,等待重新打包。

    关键数据结构

    TxPoolConfig

    type TxPoolConfig struct {
        Locals    []common.Address // 本地账户地址存放
        NoLocals  bool             // 是否开启本地交易机制
        Journal   string           // 本地交易存放路径
        Rejournal time.Duration    // 持久化本地交易的间隔
    
        PriceLimit uint64         // 价格超出比例,若想覆盖一笔交易的时候,若价格上涨比例达不到要求,那么不能覆盖
      
        PriceBump  uint64 // 替换现有交易的最低价格涨幅百分比(一次)
    
        AccountSlots uint64 // 每个账户的可执行交易限制
        GlobalSlots  uint64 // 全部账户最大可执行交易
        AccountQueue uint64 // 单个账户不可执行的交易限制
        GlobalQueue  uint64 // 全部账户最大非执行交易限制
      
        Lifetime time.Duration // 一个账户在queue中的交易可以存活的时间
    }
    

    默认配置:

    Journal:   "transactions.rlp",
    Rejournal: time.Hour,
    
    PriceLimit: 1,
    PriceBump:  10,
    
    AccountSlots: 16,
    GlobalSlots:  4096,
    AccountQueue: 64,
    GlobalQueue:  1024,
    
    Lifetime: 3 * time.Hour
    

    TxPool

    type TxPool struct {
        config      TxPoolConfig // 交易池配置
        chainconfig *params.ChainConfig // 区块链配置
        chain       blockChain // 定义blockchain接口
        gasPrice    *big.Int
        txFeed      event.Feed //时间流
        scope       event.SubscriptionScope // 订阅范围
        signer      types.Signer //签名
        mu          sync.RWMutex
    
        istanbul bool // Fork indicator whether we are in the istanbul stage.
    
        currentState  *state.StateDB // 当前头区块对应的状态
        pendingNonces *txNoncer      // Pending state tracking virtual nonces
        currentMaxGas uint64         // Current gas limit for transaction caps
    
        locals  *accountSet // Set of local transaction to exempt from eviction rules
        journal *txJournal  // Journal of local transaction to back up to disk
    
        pending map[common.Address]*txList   // All currently processable transactions
        queue   map[common.Address]*txList   // Queued but non-processable transactions
        beats   map[common.Address]time.Time // Last heartbeat from each known account
        all     *txLookup                    // All transactions to allow lookups
        priced  *txPricedList                // All transactions sorted by price
    
        chainHeadCh     chan ChainHeadEvent
        chainHeadSub    event.Subscription
        reqResetCh      chan *txpoolResetRequest
        reqPromoteCh    chan *accountSet
        queueTxEventCh  chan *types.Transaction
        reorgDoneCh     chan chan struct{}
        reorgShutdownCh chan struct{}  // requests shutdown of scheduleReorgLoop
        wg              sync.WaitGroup // tracks loop, scheduleReorgLoop
    }
    
    

    txpool初始化

    Txpool初始化主要做了以下几件事:

    1. 检查配置 配置有问题则用默认值填充

      config = (&config).sanitize()
      

      对于这部分的检查查看TxPoolConfig的字段。

    2. 初始化本地账户

    pool.locals = newAccountSet(pool.signer)
    
    1. 将配置的本地账户地址加到交易池
    pool.locals.add(addr)
    

    我们在安装以太坊客户端往往可以指定一个数据存储目录,此目录便会存储着所有我们导入的或者通过本地客户端创建的帐户keystore文件。而这个加载过程便是从该目录加载帐户数据

    1. 更新交易池

      pool.reset(nil, chain.CurrentBlock().Header())
      
    2. 创建所有交易存储的列表,所有交易的价格用最小堆存放

      pool.priced = newTxPricedList(pool.all)
      

      通过排序,优先处理gasprice越高的交易。

    3. 如果本地交易开启 那么从本地磁盘加载本地交易

      if !config.NoLocals && config.Journal != "" {
           pool.journal = newTxJournal(config.Journal)
      
           if err := pool.journal.load(pool.AddLocals); err != nil {
               log.Warn("Failed to load transaction journal", "err", err)
           }
           if err := pool.journal.rotate(pool.local()); err != nil {
               log.Warn("Failed to rotate transaction journal", "err", err)
           }
       }
      
    4. 订阅链上事件消息

      pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
      
    5. 开启主循环

      go pool.loop()
      

    注意:local交易比remote交易具有更高的权限,一是不轻易被替换;二是持久化,即通过一个本地的journal文件保存尚未打包的local交易。所以在节点启动的时候,优先从本地加载local交易。

    本地地址会被加入白名单,凡由此地址发送的交易均被认为是local交易,不论是从本地递交还是从远端发送来的。

    到此为止交易池加载过程结束。

    添加交易到txpool

    之前我们说过交易池中交易的来源一方面是其他节点广播过来的,一方面是本地提交的,追根到源代码一个是AddLocal,一个是AddRemote,不管哪个都会调用addTxs。我们对添加交易的讨论就会从这个函数开始,它主要做了以下几件事:

    1. 过滤池中已经存在的交易

      if pool.all.Get(tx.Hash()) != nil {
        errs[i] = fmt.Errorf("known transaction: %x", tx.Hash())
               knownTxMeter.Mark(1)
               continue
           }
      
    2. 将交易添加到队列中

      newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
      
      进入到addTxsLocked函数中:
      replaced, err := pool.add(tx, local)
      

      进入到 pool.add函数中,这个add函数相当重要,它是将交易添加到queue中,等待后面的promote,到pending中去。如果在queue或者pending中已经存在,并且它的gas price更高时,将覆盖之前的交易。下面来拆开的分析一下add 这个函数。

      ①:看交易是否收到过,如果已经收到过就丢弃

      if pool.all.Get(hash) != nil {
           log.Trace("Discarding already known transaction", "hash", hash)
           knownTxMeter.Mark(1)
           return false, fmt.Errorf("known transaction: %x", hash)
       }
      

      ②:如果交易没通过验证也要丢弃,这里的重点是验证函数:

      validateTx: 主要做了以下几件事
      - 交易大小不能超过32kb
      - 交易金额不能为负
      - 交易gas值不能超出当前交易池设定的gaslimit
      - 交易签名必须正确
      - 如果交易为远程交易,则需验证其gasprice是否小于交易池gasprice最小值,如果是本地,优先打包,不管gasprice
      - 判断当前交易nonce值是否过低
      - 交易所需花费的转帐手续费是否大于帐户余额  cost == V + GP * GL
      - 判断交易花费gas是否小于其预估花费gas
      

      ③:如果交易池已满,丢弃价格过低的交易

      if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
           // If the new transaction is underpriced, don't accept it
           if !local && pool.priced.Underpriced(tx, pool.locals) {
               log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
               underpricedTxMeter.Mark(1)
               return false, ErrUnderpriced
           }
           // New transaction is better than our worse ones, make room for it
           drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
           for _, tx := range drop {
               log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
               underpricedTxMeter.Mark(1)
               pool.removeTx(tx.Hash(), false)
           }
       }
      

      注意这边的GlobalSlots和GlobalQueue ,就是我们说的pending和queue的最大容量,如果交易池的交易数超过两者之和,就要丢弃价格过低的交易。

    ④:判断当前交易在pending队列中是否存在nonce值相同的交易。存在则判断当前交易所设置的gasprice是否超过设置的PriceBump百分比,超过则替换覆盖已存在的交易,否则报错返回替换交易gasprice过低,并且把它扔到queue队列中(enqueueTx)。

    if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
         // Nonce already pending, check if required price bump is met
         inserted, old := list.Add(tx, pool.config.PriceBump)
         if !inserted {
             pendingDiscardMeter.Mark(1)
             return false, ErrReplaceUnderpriced
         }
         // New transaction is better, replace old one
         if old != nil {
             pool.all.Remove(old.Hash())
             pool.priced.Removed(1)
             pendingReplaceMeter.Mark(1)
         }
         pool.all.Add(tx)
         pool.priced.Put(tx)
         pool.journalTx(from, tx)
         pool.queueTxEvent(tx)
         log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
         return old != nil, nil
     }
     // New transaction isn't replacing a pending one, push into queue
     replaced, err = pool.enqueueTx(hash, tx)
    

    添加交易的流程就到此为止了。接下来就是如何把queue(暂时不可执行)中添加的交易扔到pending(可执行交易)中,速成promote。

    1. 提升交易

      提升交易主要把交易从queue扔到pending中,我们在接下来的里面重点讲

      done := pool.requestPromoteExecutables(dirtyAddrs)
      

    交易升级

    promoteExecutables将future queue中的交易移动到pending中,同时也会删除很多无效交易比如nonce低或者余额低等等,主要分以下步骤:

    ①:将所有queue中nonce低于账户当前nonce的交易从all里面删除

    forwards := list.Forward(pool.currentState.GetNonce(addr))
            for _, tx := range forwards {
                hash := tx.Hash()
                pool.all.Remove(hash)
                log.Trace("Removed old queued transaction", "hash", hash)
            }
    

    ②:将所有queue中花费大于账户余额 或者gas大于限制的交易从all里面删除

    drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
            for _, tx := range drops {
                hash := tx.Hash()
                pool.all.Remove(hash)
                log.Trace("Removed unpayable queued transaction", "hash", hash)
            }
    

    ③:将所有可执行的交易从queue里面移到pending里面(proteTx)

    注:可执行交易:将pending里面nonce值大于等于账户当前状态nonce的且nonce连续的几笔交易作为准备好的交易

    readies := list.Ready(pool.pendingNonces.get(addr))
            for _, tx := range readies {
                hash := tx.Hash()
                if pool.promoteTx(addr, hash, tx) {
                    log.Trace("Promoting queued transaction", "hash", hash)
                    promoted = append(promoted, tx)
                }
            }
    

    重点就是 promoteTx的处理,这个方法与add的不同之处在于,add是获得到的新交易插入pending,而promoteTx是将queue列表中的Txs放入pending接下来我们先看看里面是如何来处理的:

    inserted, old := list.Add(tx, pool.config.PriceBump)
        if !inserted {
            // An older transaction was better, discard this
            // 老的交易更好,删除这个交易
            pool.all.Remove(hash)
            pool.priced.Removed(1)
    
            pendingDiscardMeter.Mark(1)
            return false
        }
        // Otherwise discard any previous transaction and mark this
        // 现在这个交易更好,删除旧的交易
        if old != nil {
            pool.all.Remove(old.Hash())
            pool.priced.Removed(1)
    
            pendingReplaceMeter.Mark(1)
        } else {
            // Nothing was replaced, bump the pending counter
            pendingGauge.Inc(1)
        }
    

    主要就做了这几件事:

    1. 将交易插入pending中,如果待插入的交易nonce在pending列表中存在,那么待插入的交易gas price大于或等于原交易价值的110%(跟pricebump设定有关)时,替换原交易
    2. 如果新交易替换了某个交易,从all列表中删除老交易
    3. 最后更新一下all列表

    经过proteTx之后,要扔到pending的交易都放在了promoted []*types.Transaction中,再回到promoteExecutables中,继续下面步骤:

    ④:如果非本地账户queue大于限制(AccountQueue),从最后取出nonce较大的交易进行remove

    if !pool.locals.contains(addr) {
                caps = list.Cap(int(pool.config.AccountQueue))
                for _, tx := range caps {
                    hash := tx.Hash()
                    pool.all.Remove(hash)
                ...
            }
    

    ⑤:最后如果队列中此账户的交易为空则删除此账户

    if list.Empty() {
                delete(pool.queue, addr)
            }
    

    到此我们的升级交易要做的事情就完毕了。


    交易降级

    交易降级的几个场景:

    1. 出现了新的区块,将会从pending中移除出现在区块中的交易到queue中
    2. 或者是另外一笔交易(gas price 更高),则会从pending中移除到queue中

    关键函数:demoteUnexecutables,主要做的事情如下:

    ①:遍历pending中所有地址对应的交易列表

    for addr, list := range pool.pending {
      ...}
    

    ②:删除所有认为过旧的交易(low nonce)

    olds := list.Forward(nonce)
            for _, tx := range olds {
                hash := tx.Hash()
                pool.all.Remove(hash)
                log.Trace("Removed old pending transaction", "hash", hash)
            }
    

    ③:删除所有费用过高的交易(余额低或用尽),并将所有无效者送到queue中以备后用

    drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
            for _, tx := range drops {
                hash := tx.Hash()
                log.Trace("Removed unpayable pending transaction", "hash", hash)
                pool.all.Remove(hash)
            }
            pool.priced.Removed(len(olds) + len(drops))
            pendingNofundsMeter.Mark(int64(len(drops)))
    
            for _, tx := range invalids {
                hash := tx.Hash()
                log.Trace("Demoting pending transaction", "hash", hash)
                pool.enqueueTx(hash, tx)
            }
    

    ④:如果交易前面有间隙,将后面的交易移到queue中

    if list.Len() > 0 && list.txs.Get(nonce) == nil {
                gapped := list.Cap(0)
                for _, tx := range gapped {
                    hash := tx.Hash()
                    log.Error("Demoting invalidated transaction", "hash", hash)
                    pool.enqueueTx(hash, tx)
                }
                pendingGauge.Dec(int64(len(gapped)))
            }
    

    注:间隙的出现通常是因为交易余额问题导致的。假如原规范链A 上交易m花费10,分叉后该账户又在分叉链B发出一个交易m花费20,这就导致该账户余额本来可以支付A链上的某笔交易,但在B链上可能就不够了。这个余额不足的交易在B如果是n+3,那么在A链上n+2,n+4号交易之间就出现了空隙,这就导致从n+3开始往后所有的交易都要降级;

    到底交易降级结束。


    重置交易池


    重置交易池将检索区块链的当前状态(主要由于更新导致链状态变化),并确保交易池的内容对于链状态而言是有效的。

    流程图如下:

    image-20201015185551752

    根据上面流程图,主要功能是由于规范链的更新,重新整理交易池:

    1. 找到由于规范链更新而作废的交易
    2. 给交易池设置最新的世界状态
    3. 把旧链退回的交易重新放入交易池

    参考:

    https://github.com/blockchainGuide/
    https://mindcarver.cn/
    https://learnblockchain.cn/2019/06/03/eth-txpool/#%E6%B8%85%E7%90%86%E4%BA%A4%E6%98%93%E6%B1%A0

    https://blog.csdn.net/lj900911/article/details/84825739

    相关文章

      网友评论

        本文标题:以太坊交易池源码分析

        本文链接:https://www.haomeiwen.com/subject/ifbopktx.html