美文网首页
Sentinel主要流程源码

Sentinel主要流程源码

作者: hcq0514 | 来源:发表于2021-01-14 10:52 被阅读0次

主要流程

  • springboot集成包
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
            <version>2.2.3.RELEASE</version>
        </dependency>

该包的spring.factories里导入了这些组件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration

主要是通过SentinelWebAutoConfiguration这个类添加一个MVC拦截器

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        if (!sentinelWebInterceptorOptional.isPresent()) {
            return;
        }
        SentinelProperties.Filter filterConfig = properties.getFilter();
        registry.addInterceptor(sentinelWebInterceptorOptional.get())
                .order(filterConfig.getOrder())
//拦截全路径/*
                .addPathPatterns(filterConfig.getUrlPatterns());
    }
  • 入口:在web请求过来的时候拦截,调用AbstractSentinelInterceptor接口
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
        throws Exception {
        try {
//获取请求的路径 比如访问的说localhost:8085/area/list,resourceName 为 area/list
            String resourceName = getResourceName(request);

            if (StringUtil.isEmpty(resourceName)) {
                return true;
            }
            
            if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
                return true;
            }
            
            String origin = parseOrigin(request);
            String contextName = getContextName(request);
            ContextUtil.enter(contextName, origin);
//主要是进入到这个方法
            Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
            return true;
        } catch (BlockException e) {
            try {
//处理流控的exception返回给前端,可以自己定制返回内容
                handleBlockException(request, response, e);
            } finally {
                ContextUtil.exit();
            }
            return false;
        }
    }

  • 主要拦截方法:entry方法一直点进来会进到com.alibaba.csp.sentinel.CtSph#entryWithPriority
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
...省略一堆校验
//获取该资源的拦截链
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
//将资源,拦截链环境包装为一个entry
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
//真正执行拦截链的方法(下面详解)
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//获取该资源上的拦截链
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
//当第一次进来的时候回拦截链chain为空
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
//初始化拦截链,(下面单独开出来讲)
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
  • chain = SlotChainProvider.newSlotChain();创建拦截链
    这边主要是创建了8个拦截链,分别对应我们sentinel控制台配的8个,主要我们常用的说流控规则跟降级规则
表头 表头
链路节点生成 NodeSelectorSlot
集群流控 ClusterBuilderSlot
规则限制日志相关 LogSlot
统计相关(重要) StatisticSlot
热点规则 ParamFlowSlot
授权规则 AuthoritySlot
系统规则 SystemSlot
流控规则(重要) FlowSlot
降级规则(重要) DegradeSlot
    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }
...省略spi等代码
    }

public class HotParamSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new ParamFlowSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        return chain;
    }
}

构建后形成这个链表


image.png
  • 真正执行拦截链的方法 chain.entry(...);
    调用chain里面的first.transformEntry方法,也就是他的处理方法,因为第一个是默认的DefaultProcessorSlotChain,他没有做任何处理,直接调用 next.transformEntry给下面的责任链处理
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

几个责任链的主要代码

  • NodeSelectorSlot(负责请求链路资源的归纳)
    负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据凋用路径来限流降级
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
//context.getName()的结果是sentinel_spring_web_context形成下面那个节点
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }
        context.setCurNode(node);
//调用下个Slot处理
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  • ClusterBuilderSlot(集群相关)
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  • LogSlot(日志相关)
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        try {
            fireEntry(context, resourceWrapper, obj, count, prioritized, args);
        } catch (BlockException e) {
//当发生限制时记录信息。
            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                context.getOrigin(), count);
            throw e;
        } catch (Throwable e) {
            RecordLog.warn("Unexpected entry exception", e);
        }
    }
  • StatisticSlot(重要,统计类)

用于存储资源的统计信息以及调用者信息,例如该资源的 RT QPS, thread count等等这些信息将用作为多维度限流,降级的依据
统计用的是滑动时间算法,篇幅有点长,放到滑动时间算法与sentinel实践

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            //执行后面的检查
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            // 如果其他的校验都成功没有问题
            //增加单位时间成功线程数量
            node.increaseThreadNum();
            //增加单位时间成功请求,(滑动时间窗口计数实现)
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;
        }
    }
  • ParamFlowSlot(热点流控)
//对有添加热点流控规则的资源进行限制
    void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        if (args == null) {
            return;
        }
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            return;
        }
        List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

        for (ParamFlowRule rule : rules) {
            applyRealParamIdx(rule, args.length);

            // Initialize the parameter metrics.
            ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

            if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                String triggeredParam = "";
                if (args.length > rule.getParamIdx()) {
                    Object value = args[rule.getParamIdx()];
                    triggeredParam = String.valueOf(value);
                }
                throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
            }
        }
    }
  • SystemSlot
    当配置了系统规则后,会根据配置的系统规则进行校验
    public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
 ...
        // qps限制数
        double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
        if (currentQps > qps) {
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }
        // thread限制
        int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }
        //rt限制
        double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
            if (!checkBbr(currentThread)) {
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }
        // CPU使用率
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }
  • AuthoritySlot(根据配置的授权规则来限制)
    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
            return;
        }

        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
  • FlowSlot(重要,限流相关,sentinel拉取配置应用nacos的动态加载配置com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener主要调用该方法)
    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
//获取当前资源下的所有的流控规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
//调用
    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
//这边的getRater()方法会根据你配置时的 1、快速失败  2、Warm Up 3、 排队等待 三个类型分别调用不同的方法
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
  1. 快速失败
    主要是调用com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass()
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //获取当前平均访问次数,avgUsedTokens是获取node.passQps(),调用StatisticNode#passQps
        int curCount = avgUsedTokens(node);
        //如果超过限制则直接返回false
        if (curCount + acquireCount > count) {
        //DefaultController这边的 prioritized 为false里面的方法不提示
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
      ...
                }
            }
            return false;
        }
        return true;
    }
  1. Warm Up 调用了WarmUpController#canPass(...) 这边主要使用的是令牌桶算法
    //构建函数
    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
        //冷因子不能小于1,默认为3
        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }
        //设置阈值
        this.count = count;
        //设置冷因子
        this.coldFactor = coldFactor;
        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        //预警token=(预热时长*阈值)/(冷因子-1)
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        // maxToken = 200
        //最大token=(预热预警token*阈值)/(冷因子-1)
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
        // - thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }
  1. 排队等待
    主要是用到RateLimiterController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
   @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //当请求通过量小于等于0时,直接返回通过
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
        // 这个count是你控制台设置的阈值
        if (count <= 0) {
            return false;
        }
        //获取当前时间
        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        // 计算两个请求之间需要花费的时间
        // 假设acquireCount=1,count =10, 则((1*1)/10*1000)取整=1000
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        // 预期通过这个请求的时间
        long expectedTime = costTime + latestPassedTime.get();
        // 判断预期通过时间是否小于当前时间
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // Calculate the time to wait.
            // 计算预计需要等待的时间(当前预期需要花费的时间+最后一次成功的时间-当前时间)
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 如果预计需要等待的时间大于后台配置的等待时间,则直接拒绝
            if (waitTime > maxQueueingTimeMs(后台配置为毫秒)) {
                return false;
            } else {
            // 设置latestPassedTime,用atomic变量防止并发
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
            // 再做一次超时判断
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
            // 休眠等待时间
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
  • DegradeSlot(重要,降级相关)
    @Override
    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        if (cut.get()) {
            return false;
        }

        ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
        if (clusterNode == null) {
            return true;
        }
        //熔断策略为RT(响应时间)慢调用比例 这边三个规则对应后台的配置
        if (grade == RuleConstant.DEGRADE_GRADE_RT) {
           //平均响应时间
            double rt = clusterNode.avgRt();
            if (rt < this.count) {
                passCount.set(0);
                return true;
            }
            // Sentinel will degrade the service only if count exceeds.
            // 只有在通过数超过设置的最小值的时候才会降级
            if (passCount.incrementAndGet() < rtSlowRequestAmount) {
                return true;
            }
        }
            // 熔断策略为异常比例时
         else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
            // 单位时间内的异常数
            double exception = clusterNode.exceptionQps();
            // 单位时间内的成功数
            double success = clusterNode.successQps();
            // 单位时间内的总共请求数
            double total = clusterNode.totalQps();
            // If total amount is less than minRequestAmount, the request will pass.
            // 当总共请求数达不到最小请求数(后台配置)时直接放行
            if (total < minRequestAmount) {
                return true;
            }

            // In the same aligned statistic time window,
            // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
            // success数包含了有异常跟无异常的,所以要求realSuc要减去异常数
            double realSuccess = success - exception;
            if (realSuccess <= 0 && exception < minRequestAmount) {
                return true;
            }
            if (exception / success < count) {
                return true;
            }
        }
            // 熔断策略为异常数
          else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
            //一分钟内的异常数  totalException() {    return rollingCounterInMinute.exception();   }
            double exception = clusterNode.totalException();
            if (exception < count) {
                return true;
            }
        }

        if (cut.compareAndSet(false, true)) {
            ResetTask resetTask = new ResetTask(this);
            pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
        }

        return false;
    }

相关文章

网友评论

      本文标题:Sentinel主要流程源码

      本文链接:https://www.haomeiwen.com/subject/zdhuaktx.html