美文网首页vue 好吧
sentinel热点参数限流-漏桶算法vs令牌桶算法

sentinel热点参数限流-漏桶算法vs令牌桶算法

作者: ershuai8614 | 来源:发表于2022-02-27 23:01 被阅读0次

    引子

    在上篇文章中我们介绍了sentinel中的滑动窗口算法,发现限流的准确度依赖于划分的子窗口数量。而在很多情况下,我们的限流更多的是需要限制到参数级别,比如我们需要限制每个用户调用接口的频次,这种就需要精确到接口参数级别。而如果用滑动窗口来实现,就需要统计每个窗口内不同参数对应的请求数量,这样过于复杂,对于这种热点参数的限流,sentinel根据限流效果的不同,分别使用了漏桶算法和令牌桶算法来进行实现。

    漏桶算法原理

    主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。看下漏桶算法的原理图


    漏桶算法

    请求先进入到漏桶里,漏桶以一定的速度出水,当水请求过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

    sentinel中的漏桶算法实现

    @Spi(order = -3000)
    public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode>{
        void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
            if (args == null) {
                return;
            }
            if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
                return;
            }
            //获取热点参数配置
            List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
    
            for (ParamFlowRule rule : rules) {
                applyRealParamIdx(rule, args.length);
    
                // Initialize the parameter metrics.
                //初始化热点参数
                ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
                //passCheck 校验请求是否通过
                if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                    String triggeredParam = "";
                    if (args.length > rule.getParamIdx()) {
                        Object value = args[rule.getParamIdx()];
                        triggeredParam = String.valueOf(value);
                    }
                    throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
                }
            }
        }
    }
    
    
    public final class ParamFlowChecker{
        public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
                                 Object... args) {
            if (args == null) {
                return true;
            }
    
            int paramIdx = rule.getParamIdx();
            if (args.length <= paramIdx) {
                return true;
            }
    
            // Get parameter value.
            Object value = args[paramIdx];
    
            // Assign value with the result of paramFlowKey method
            if (value instanceof ParamFlowArgument) {
                value = ((ParamFlowArgument) value).paramFlowKey();
            }
            // If value is null, then pass
            if (value == null) {
                return true;
            }
            //集群参数流控
            if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
                return passClusterCheck(resourceWrapper, rule, count, value);
            }
            //单机参数流控
            return passLocalCheck(resourceWrapper, rule, count, value);
        }
    
        private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
                                              Object value) {
            try {
                //参数类型如果为Collection,循环校验每个参数
                if (Collection.class.isAssignableFrom(value.getClass())) {
                    for (Object param : ((Collection)value)) {
                        if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                            return false;
                        }
                    }
                }
                //参数类型如果为Collection,循环校验每个参数
                else if (value.getClass().isArray()) {
                    int length = Array.getLength(value);
                    for (int i = 0; i < length; i++) {
                        Object param = Array.get(value, i);
                        if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                            return false;
                        }
                    }
                }
                //单个元素校验
                else {
                    return passSingleValueCheck(resourceWrapper, rule, count, value);
                }
            } catch (Throwable e) {
                RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
            }
    
            return true;
        }
    

    重点来了,passThrottleLocalCheck方法就是漏桶算法的具体实现,主要原理就是:根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒costTime,取出上次请求通过的时间+costTime,就能得到我们期望此次请求到来的时间(expectedTime),如果请求到达时间大于期望时间,则放行;如果小于期望时间,说明流量过于密集,需要限制其速率,等待一段时间后(expectedTime-now)再放行。

        /**
         * 匀速排队限流
         * @param resourceWrapper
         * @param rule
         * @param acquireCount
         * @param value
         * @return
         */
        static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                              Object value) {
            ParameterMetric metric = getParameterMetric(resourceWrapper);
            CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
            if (timeRecorderMap == null) {
                return true;
            }
    
            // Calculate max token count (threshold)
            Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
            long tokenCount = (long)rule.getCount();
            if (exclusionItems.contains(value)) {
                tokenCount = rule.getParsedHotItems().get(value);
            }
    
            if (tokenCount == 0) {
                return false;
            }
            //根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒
            long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
            while (true) {
                long currentTime = TimeUtil.currentTimeMillis();
                AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
                if (timeRecorder == null) {
                    return true;
                }
                //AtomicLong timeRecorder = timeRecorderMap.get(value);
                //上次请求通过的时间
                long lastPassTime = timeRecorder.get();
                //期望的请求到达时间
                long expectedTime = lastPassTime + costTime;
    
                /**
                 * 实际请求到达时间如果大于期望的请求到达时间 并且 多出来的这部分时间大于设置的等待时间,则限流
                 * 否则请求
                 */
                if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
                    AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
                    if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
                        long waitTime = expectedTime - currentTime;
                        /**
                         * 等待时间大于0,说明请求在期望时间之前到达,流量太过密集,等待之后放行;小于0,说明请求在期望时间之后到达,直接放行
                         */
                        if (waitTime > 0) {
                            lastPastTimeRef.set(expectedTime);
                            try {
                                TimeUnit.MILLISECONDS.sleep(waitTime);
                            } catch (InterruptedException e) {
                                RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
                            }
                        }
                        return true;
                    } else {
                        Thread.yield();
                    }
                } else {
                    return false;
                }
            }
        }
    }
    

    令牌桶算法原理

    对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。


    令牌桶算法

    sentinel-令牌桶算法实现

    public final class ParamFlowChecker {
        static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                             Object value) {
            ParameterMetric metric = getParameterMetric(resourceWrapper);
            //令牌计数器
            CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
            //时间计数器
            CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
    
            if (tokenCounters == null || timeCounters == null) {
                return true;
            }
    
            // Calculate max token count (threshold)
            //针对参数例外项单独限流
            Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
            long tokenCount = (long)rule.getCount();
            if (exclusionItems.contains(value)) {
                tokenCount = rule.getParsedHotItems().get(value);
            }
    
            if (tokenCount == 0) {
                return false;
            }
            //最大令牌数 = 设置的阈值 + 额外允许的突发流量数
            long maxCount = tokenCount + rule.getBurstCount();
            if (acquireCount > maxCount) {
                return false;
            }
    
            while (true) {
                long currentTime = TimeUtil.currentTimeMillis();
                //获取上一次令牌更新时间
                AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
                //表示系统启动后的第一次请求,直接返回校验通过
                if (lastAddTokenTime == null) {
                    // Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
                    //计算在时间窗口内有多少次请求可以通过
                    tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                    return true;
                }
    
                // Calculate the time duration since last token was added.
                //计算此次请求距离上一次请求(这里的上一次请求指的是一个统计窗口周期内的第一个请求)过去了多久
                long passTime = currentTime - lastAddTokenTime.get();
                // A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
                if (passTime > rule.getDurationInSec() * 1000) {
                    //一个统计时间窗口已经过去,会重新计算在此时间窗口内有多少次请求可以通过
                    AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                    if (oldQps == null) {
                        // Might not be accurate here.
                        lastAddTokenTime.set(currentTime);
                        return true;
                    } else {
                        //获取请求通过时剩余令牌数
                        long restQps = oldQps.get();
                        /** * 举例说明,比如1s 允许通过100个请求,当前时间是1900ms,
                         * 上一次请求时间是800ms * 那么totalCount = 1100 * 100 / 1 * 1000 = 110
                         */
                        long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
                        /** 
                         * 最新的qps需要之前剩余的加上间隔时间内新增的,但是不能超过最大值
                         */
                        long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
                            : (restQps + toAddCount - acquireCount);
    
    
                        if (newQps < 0) {
                            return false;
                        }
                        if (oldQps.compareAndSet(restQps, newQps)) {
                            //这里需要更新lastAddTokenTime
                            lastAddTokenTime.set(currentTime);
                            return true;
                        }
                        Thread.yield();
                    }
                } else {
                    //未超过时间窗口,获取剩余令牌数
                    AtomicLong oldQps = tokenCounters.get(value);
                    if (oldQps != null) {
                        long oldQpsValue = oldQps.get();
                        if (oldQpsValue - acquireCount >= 0) {
                            if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
                                //这里不需要更新lastAddTokenTime,因为还在一个统计窗口时间周期内
                                return true;
                            }
                        } else {
                            return false;
                        }
                    }
                    Thread.yield();
                }
            }
        }
    }
    

    代码较为冗长,我们来看一下整体的流程图:


    令牌桶算法逻辑

    令牌桶算法最重要的就是关注令牌生成的部分。而在sentinel的实现中,令牌生成是在 当前请求时间距离上一次请求时间已经大于设置的窗口时间。那么该算法是怎么支持突发流量的呢。
    我们以一个具体示例来看:
    假定我们设置一分钟请求不能超过1000次。时间窗口durationInSec = 60s
    ,tokenCount = 1000;允许突发的流量为burstCount = 2000; maxCount = tokenCount + burstCount = 3000;
    假设第1s来了一个请求,因为首次请求,通过,剩余令牌数为 2999。在第10s-20s 流量增大陆续来了2900次请求,因为都在一个窗口时间内且令牌数足够,所有请求均通过,剩余令牌数为9。随后教长一段时间没有请求,一直到第90s请求陆续到达,这时90s距离上一次请求时间20s已经过去了一个时间窗口,我们需要计算在这 70s内我们需要新增多少个令牌数
    toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000)
    =70s * 1000ms * 1000 / 60s * 1000ms = 1166.67

    newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
    : (restQps + toAddCount - acquireCount);
    上次剩余的令牌数 restQps = 9
    newQps = 1166.67 + 9 -1 = 1174.67 < maxCount
    所有在接下来的 60s-120s 时间段内,请求总数不可以超过1174.67。
    但是随着时间的流逝,只要在一段时间内没有请求过来,令牌又会达到maxCount个,理论上只要3分钟内没有请求到达,令牌数最少又会恢复到maxCount=3000个。
    我们发现,sentinel应对突发流量,有个缓冲的效果,首次流量突发之后,如果再有流量突发的情况,必须给系统足够的缓冲时间,否则依然会被限流。

    总结:

    漏桶算法和令牌桶算法两者主要区别在于“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输速率外,还允许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,所以它适合于具有突发特性的流量

    相关文章

      网友评论

        本文标题:sentinel热点参数限流-漏桶算法vs令牌桶算法

        本文链接:https://www.haomeiwen.com/subject/navhrrtx.html