美文网首页区块链 Ripple
Ripple中处理高并发请求的方法

Ripple中处理高并发请求的方法

作者: SwordShield | 来源:发表于2018-05-28 22:41 被阅读2次

    1.角色与限制

    下面这段代码是根据发送请求的客户端ip来判断客户端的角色,根据角色不同做出不同的处理,admin角色的用户无并发限制,而普通用户则会判断当前是否应该断开连接:

    func:ServerHandlerImp::processRequest
    ...
    Resource::Consumer usage;
    if (isUnlimited(role))
    {
        usage = m_resourceManager.newUnlimitedEndpoint(
            remoteIPAddress.to_string());
    }
    else
    {
        usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
        if (usage.disconnect())
        {
            HTTPReply(503, "Server is overloaded", output, rpcJ);
            return;
        }
    }
    

    admin用户

    admin用户可在配置文件中配置:

    [port_ws_public]
    port = 6006
    ip = 0.0.0.0
    admin = 192.168.0.133,192.168.0.144
    protocol = ws
    

    配置为admin的ip是不受并发约束的,所以如果是一个我们已知的客户端要发送大量请求到节点,可以将这个客户端的ip添加到admin列表中,以避免连接断开

    普通用户

    在Ripple中,每个客户端都是有一个并发额度限制的,每个请求根据请求类型不同,收取相应的并发费用,当并发费用超过额度(15000*32),则下次请求会断开连接。

    并发费用的计算:

    发送请求费用会增加,不发送请求时,费用会减少,减少规则:

    // A span larger than four times the window decays the
    // value to an insignificant amount so just reset it.
    //
    if (elapsed > 4 * Window)
    {
        m_value = value_type();
    }
    else
    {
        while (elapsed--)
            m_value -= (m_value + Window - 1) / Window;
    }
    

    上面代码中:

    • Window = 32,值参见 decayWindowSeconds 的定义及使用,所以最大经过2分钟,费用会重置为0,实际情况考虑衰减的情况,应该1分钟左右就会重置
    • eclapsed为上次请求到现在经过的秒数,当elapsed越大,m_value值越大,m_value减小的越快

    不同请求的收费情况(部分):

    • 普通请求:20
    • 提交交易:400
    • 无效请求:100
    • 异常请求:100
    • 查找路径:50-400
    • gateway_balances,ledger_request,path_find(create),ripple_path_find,sign,signFor : 3000

    不考虑衰减,并发较高的情况下,用户发送到 15000 * 32 / 400 = 1200 个交易就会被断开连接,考虑到衰减,这值会大一些,实测并发为18的情况下,这个值为1500-1700

    2.FeeEscalation特性

    开启方式

    开启FeeEscalation特性有两种方式,可直接在配置文件中配置:

    [features]
    FeeEscalation
    

    也可以在链启动两周时自动开启

    功能

    开启FeeEscalation特性后,当节点发现当前共识中的交易数量多了,新的交易就会排队,返回terQUEUED错误,返回这一错误后,交易有极大可能仍然会成功,所以需要 像返回tesSUCCESS一样去确认共识结果

    关于排队的参数,有一个是比较重要的:

    [transaction_queue]
    minimum_txn_in_ledger = 200
    

    这个配置标明当前区块所能共识的交易数量,也就是说当前正在共识的交易量达到200时,后面的交易都要进行排队

    实现方式:

    一个交易发过来之后会经过下面的流程:

    1. 先检查FeeEscalation特性是否开启,如果未开启,直接走正常流程的apply
    2. 计算当前的费用,代码如下:
    // We may need the base fee for multiple transactions
    // or transaction replacement, so just pull it up now.
    // TODO: Do we want to avoid doing it again during
    //   preclaim?
    auto const baseFee = calculateBaseFee(app, view, *tx, j);
    auto const feeLevelPaid = getFeeLevelPaid(*tx,
        baseLevel, baseFee, setup_);
    auto const requiredFeeLevel = [&]()
    {
        auto feeLevel = FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
        if ((flags & tapPREFER_QUEUE) && byFee_.size())
        {
            return std::max(feeLevel, byFee_.begin()->feeLevel);
        }
        return feeLevel;
    }();
    

    scaleFeeLevel很关键,下面是实现:

    std::uint64_t
    TxQ::FeeMetrics::scaleFeeLevel(Snapshot const& snapshot,
        OpenView const& view, std::uint32_t txCountPadding)
    {
        // Transactions in the open ledger so far
        auto const current = view.txCount() + txCountPadding;
    
        auto const target = snapshot.txnsExpected;
        auto const multiplier = snapshot.escalationMultiplier;
    
        // Once the open ledger bypasses the target,
        // escalate the fee quickly.
        if (current > target)
        {
            // Compute escalated fee level
            // Don't care about the overflow flag
            return mulDiv(multiplier, current * current,
                target * target).second;
        }
    
        return baseLevel;
    }
    
    std::uint32_t minimumEscalationMultiplier = baseLevel * 500;
    
    

    里面的txnsExpected 的值就是我们配置的 minimum_txn_in_ledger

    这里multiplier的默认值为128000,可以看到current与target差距越大,最终的费用越高

    1. 如果交易提供的费用足够(不小于requiredFeeLevel),则继续执行,否则排队
    2. 排队涉及到两个数据结构:
    FeeMultiSet byFee_;
    AccountMap byAccount_;
    

    byFee是按费用排序的交易集

    byAccount_是按账户分隔的map

    1. 当新的区块共识通过,会创建新的OpenLedger,创建后立即应用队列中的交易:
    RCLConsensus::Adaptor::doAccept
    ...
        // Build new open ledger
        auto lock = make_lock(app_.getMasterMutex(), std::defer_lock);
        auto sl = make_lock(ledgerMaster_.peekMutex(), std::defer_lock);
        std::lock(lock, sl);
    
        auto const lastVal = ledgerMaster_.getValidatedLedger();
        boost::optional<Rules> rules;
        if (lastVal)
            rules.emplace(*lastVal, app_.config().features);
        else
            rules.emplace(app_.config().features);
        app_.openLedger().accept(
            app_,
            *rules,
            sharedLCL.ledger_,
            localTxs_.getTxSet(),
            anyDisputes,
            retriableTxs,
            tapNONE,
            "consensus",
            [&](OpenView& view, beast::Journal j) {
                // Stuff the ledger with transactions from the queue.
                return app_.getTxQ().accept(app_, view);
            });
    ...
    

    TxQ::accept中仍然有费用的限制

    TxQ::accept
    ...
        auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
            metricSnapshot, view);
        auto const feeLevelPaid = candidateIter->feeLevel;
        if (feeLevelPaid >= requiredFeeLevel)
        {
            auto firstTxn = candidateIter->txn;
    
            JLOG(j_.trace()) << "Applying queued transaction " <<
                candidateIter->txID << " to open ledger.";
    
            TER txnResult;
            bool didApply;
            std::tie(txnResult, didApply) = candidateIter->apply(app, view);
    ...
    

    TxQ::accept中使用了互斥量

    std::lock_guard<std::mutex> lock(mutex_);
    

    与TxQ::apply是并发互斥的

    相关文章

      网友评论

        本文标题:Ripple中处理高并发请求的方法

        本文链接:https://www.haomeiwen.com/subject/fqbnjftx.html