美文网首页
Sentinel之Slots插槽源码分析热点参数限流(七)

Sentinel之Slots插槽源码分析热点参数限流(七)

作者: 橘子_好多灰 | 来源:发表于2019-02-12 22:50 被阅读0次

    一、引子

    该篇是插槽分析的最后一篇。
    何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:

    • 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
    • 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制

    热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。

    Sentinel 利用 LRU 策略,结合底层的滑动窗口机制来实现热点参数统计。LRU 策略可以统计单位时间内,最近最常访问的热点参数,而滑动窗口机制可以帮助统计每个参数的 QPS。

    二、使用

    使用热点限流功能,需要引入以下依赖:

    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-parameter-flow-control</artifactId>
        <version>x.y.z</version>
    </dependency>
    

    然后为对应的资源配置热点参数限流规则,并在 entry的时候传入相应的参数,即可使热点参数限流生效。

    注:若自行扩展并注册了自己实现的 SlotChainBuilder,并希望使用热点参数限流功能,则可以在 chain 里面合适的地方插入ParamFlowSlot

    public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException
    public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException
    

    其中最后的一串 args 就是要传入的参数,有多个就按照次序依次传入。比如要传入两个参数 paramA 和 paramB,则可以:

    // paramA in index 0, paramB in index 1.
    SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
    

    三、热点参数规则

    热点参数规则(ParamFlowRule)类似于流量控制规则(FlowRule):

    属性 说明 默认值
    resource 资源名,必填
    count 限流阈值,必填
    grade 限流模式 QPS 模式
    paramIdx 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置
    paramFlowItemList 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型

    可以通过 ParamFlowRuleManagerloadRules 方法更新热点参数规则,下面是一个示例:

    ParamFlowRule rule = new ParamFlowRule(resourceName)
        .setParamIdx(0)
        .setCount(5);
    // 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
    ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
        .setClassType(int.class.getName())
        .setCount(10);
    rule.setParamFlowItemList(Collections.singletonList(item));
    
    ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
    

    四、源码分析

    1、ParamFlowSlot

        @Override
     public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
            throws Throwable {
    
            //检查该资源是否存在限流资源
            if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
                fireEntry(context, resourceWrapper, node, count, args);
                return;
            }
    
            //限流规则检测
            checkFlow(resourceWrapper, count, args);
            fireEntry(context, resourceWrapper, node, count, args);
        }
    
     void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args)
            throws BlockException {
            //若存在限流规则
            if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
               //获取限流规则
                List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
                if (rules == null) {
                    return;
                }
    
                for (ParamFlowRule rule : rules) {
                    // Initialize the parameter metrics.
                    //初始化参数统计
                    initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());
                    //若热点参数限流符合,则进行限流处理
                    if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
    
                        // Here we add the block count.
                        addBlockCount(resourceWrapper, count, args);
    
                        String message = "";
                        if (args.length > rule.getParamIdx()) {
                            Object value = args[rule.getParamIdx()];
                            message = String.valueOf(value);
                        }
                        throw new ParamFlowException(resourceWrapper.getName(), message);
                    }
                }
            }
        }
    

    通过checkFlow进行限流规则检测:

    1. 若该资源存在限流规则,并获取该资源的限流规则;否则跳过检测;
    2. 循环判断规则,并先初始化热点参数统计窗口,如下:
      void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
            ParameterMetric metric;
            // Assume that the resource is valid.
            if ((metric = metricsMap.get(resourceWrapper)) == null) {
                synchronized (LOCK) {
                    if ((metric = metricsMap.get(resourceWrapper)) == null) {
                        metric = new ParameterMetric();
                        metricsMap.put(resourceWrapper, metric);
                        RecordLog.info("[ParamFlowSlot] Creating parameter metric for: " + resourceWrapper.getName());
                    }
                }
            }
            metric.initializeForIndex(index);
        }
    

    通过ParameterMetric的initialiazeForIndex初始了一个时间窗口。

    1. 通过ParamFlowChecker的passCheck方法检测规则。

    2、ParamFlowChecker

      static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
                                 Object... args) {
            // 如果参数不存在直接返回
            if (args == null) {
                return true;
            }
    
            int paramIdx = rule.getParamIdx();
            //参数的个数小于规则的索引直接返回
            if (args.length <= paramIdx) {
                return true;
            }
    
            Object value = args[paramIdx];
            
            //规则调用
            return passLocalCheck(resourceWrapper, rule, count, value);
        }
    
      private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
            try {
                if (Collection.class.isAssignableFrom(value.getClass())) {
                    for (Object param : ((Collection)value)) {
                        if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                            return false;
                        }
                    }
                } else if (value.getClass().isArray()) {
                    int length = Array.getLength(value);
                    for (int i = 0; i < length; i++) {
                        Object param = Array.get(value, i);
                        if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                            return false;
                        }
                    }
                } else {
                    return passSingleValueCheck(resourceWrapper, rule, count, value);
                }
            } catch (Throwable e) {
                RecordLog.info("[ParamFlowChecker] Unexpected error", e);
            }
    
            return true;
        }
    

    在passLocalCheck方法中:
    1.依次对Collection类型,Array类型,非数组集合类型进行处理,进入到passSingleValueCheck中;

    
        static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
            Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
            if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
             //获取当前参数在该资源时间窗口内通过的数量
                double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);
    
                if (exclusionItems.contains(value)) {
                    // Pass check for exclusion items.
                    int itemQps = rule.getParsedHotItems().get(value);
                    return curCount + count <= itemQps;
                } else if (curCount + count > rule.getCount()) {
                    if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
                        return true;
                    }
                    return false;
                }
            }
    
            return true;
        }
    
    1. 获取rule的解析参数集合HotItems;
    2. 获取当前参数在该资源时间窗口内通过的数量;
    3. 如果exclusionItems(参数例外项)包含该值,判断curCount + count 大于itemQpsitemQps是该参数对应的count,若大于则说明超过限流阈值,则返回false,反之返回true;
    4. 如果exclusionItems不包含该值,并且curCount + count 大于rule.getCount()时,并判断(curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0是否满足,若满足则返回true,反之返回false。

    3、ParameterMetric

       //保存了对应资源的时间窗口数据
       private Map<Integer, HotParameterLeapArray> rollingParameters =
            new ConcurrentHashMap<Integer, HotParameterLeapArray>();
    
        public Map<Integer, HotParameterLeapArray> getRollingParameters() {
            return rollingParameters;
        }
    
        public synchronized void clear() {
            rollingParameters.clear();
        }
    
        // 初始化一个时间窗口
        public void initializeForIndex(int index) {
            if (!rollingParameters.containsKey(index)) {
                synchronized (this) {
                    // putIfAbsent
                    if (rollingParameters.get(index) == null) {
                        rollingParameters.put(index, new HotParameterLeapArray(
                            1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL));
                    }
                }
            }
        }
    

    热点参数统计

    //增加通过数
     public void addPass(int count, Object... args) {
            add(RollingParamEvent.REQUEST_PASSED, count, args);
        }
    //增加阻塞数
    public void addBlock(int count, Object... args) {
            add(RollingParamEvent.REQUEST_BLOCKED, count, args);
    }
    
    @SuppressWarnings("rawtypes")
    private void add(RollingParamEvent event, int count, Object... args) {
            if (args == null) {
                return;
            }
            try {
                for (int index = 0; index < args.length; index++) {
                    HotParameterLeapArray param = rollingParameters.get(index);
                    if (param == null) {
                        continue;
                    }
    
                    Object arg = args[index];
                    if (arg == null) {
                        continue;
                    }
                    if (Collection.class.isAssignableFrom(arg.getClass())) {
                        for (Object value : ((Collection)arg)) {
                            param.addValue(event, count, value);
                        }
                    } else if (arg.getClass().isArray()) {
                        int length = Array.getLength(arg);
                        for (int i = 0; i < length; i++) {
                            Object value = Array.get(arg, i);
                            param.addValue(event, count, value);
                        }
                    } else {
                        param.addValue(event, count, arg);
                    }
    
                }
            } catch (Throwable e) {
                RecordLog.warn("[ParameterMetric] Param exception", e);
            }
        }
    
    1. 可以发现热点参数的统计也是基于滑动时间窗口统计,这个就不具体分析了,滑动时间窗口前面有讲解,见滑动时间窗口
      2.有点不同的是:在限流规则里指标的是通过LongAdder分段统计的,而热点参数的指标是通过AtomicInteger的addAndGet方法统计的。
     public ParamMapBucket add(RollingParamEvent event, int count, Object value) {
            data[event.ordinal()].putIfAbsent(value, new AtomicInteger());
            AtomicInteger counter = data[event.ordinal()].get(value);
            counter.addAndGet(count);
            return this;
        }
    

    3.热点参数获取的实际上是统计范围内一个平均值。如下:

     public double getPassParamQps(int index, Object value) {
            try {
                HotParameterLeapArray parameter = rollingParameters.get(index);
                if (parameter == null || value == null) {
                    return -1;
                }
                return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
            } catch (Throwable e) {
                RecordLog.info(e.getMessage(), e);
            }
    
            return -1;
        }
    
    public double getRollingAvg(RollingParamEvent event, Object value) {
            return ((double) getRollingSum(event, value)) / getIntervalInSec();
     }
    
    public long getRollingSum(RollingParamEvent event, Object value) {
            currentWindow();
    
            long sum = 0;
    
            List<ParamMapBucket> buckets = this.values();
            for (ParamMapBucket b : buckets) {
                sum += b.get(event, value);
            }
    
            return sum;
        }
    
    

    五、我的总结

    1. 本文介绍热点参数的概念、使用、及部分源码分析。
    2. 热点参数限流使用需要单独引用Jar包,并设置规则后才会启动限流效果。
    3. 目前热点参数限流只是支持QPS模式,且还支持额外参数项进行限流。
    4. 热点参数的参数限流统计也是基于滑动窗口统计的,内部使用了AtomicInteger的原子结构统计及ConcurrentLinkedHashMap结构作为数据存储。

    以上内容,如有不当之处,请指正

    相关文章

      网友评论

          本文标题:Sentinel之Slots插槽源码分析热点参数限流(七)

          本文链接:https://www.haomeiwen.com/subject/nsiqjqtx.html