美文网首页javaWeb学习架构
微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码

微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码

作者: 多氯环己烷 | 来源:发表于2022-01-31 13:11 被阅读0次

    前言

    参考资料
    《Spring Microservices in Action》
    《Spring Cloud Alibaba 微服务原理与实战》
    《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
    《Sentinel GitHub 官网》
    《Sentinel 官网》

    调用链路是 Sentinel 的工作主流程,由各个 Slot 槽组成,将不同的 Slot 槽按照顺序串在一起,从而将不同的功能(限流、降级、系统保护)组合在一起;

    本篇《2. 获取 ProcessorSlot 链》将从源码级讲解如何获取调用链路,接着会以遍历链表的方式处理每一个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应本篇《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;


    1. Sentinel 的自动装配

    1.2 依赖引入

    • 我们引入 Sentinel 的 starter 依赖文件,不需要太多额外操作,即可使用 Sentinel 默认自带的限流功能,原因是这些配置和功能都给我们自动装配了;
    • 在 Spring-Cloud-Alibaba-Sentinel 包下的 META-INF/spring.factories 文件里定义了会自动装配哪些类;
    Sentinel 的自动装配.png
    • SentinelWebAutoConfiguration:对 Web Servlet 环境的支持;
    • SentinelWebFluxAutoConfiguration:对 Spring WebFlux 的支持;
    • SentinelEndpointAutoConfiguration:暴露 Endpoint 信息;
    • SentinelFeignAutoConfiguration:用于适应 Feign 组件;
    • SentinelAutoConfiguration:支持对 RestTemplate 的服务调用使用 Sentinel 进行保护;

    1.3 SentinelWebAutoConfiguration 配置类

    • SentinelWebAutoConfiguration 配置类中自动装配了一个 FilterRegistrationBean,其主要作用是注册一个 CommonFilter,并且默认情况下通过 /* 规则拦截所有的请求;
    @Configuration
    @EnableConfigurationProperties(SentinelProperties.class)
    public class SentinelWebAutoConfiguration {
        
        //省略其他代码
        
        @Bean
        @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
        public FilterRegistrationBean sentinelFilter() {
            FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<>();
    
            SentinelProperties.Filter filterConfig = properties.getFilter();
    
            if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
                List<String> defaultPatterns = new ArrayList<>();
                //默认情况下通过 /* 规则拦截所有的请求
                defaultPatterns.add("/*");
                filterConfig.setUrlPatterns(defaultPatterns);
            }
    
            registration.addUrlPatterns(filterConfig.getUrlPatterns().toArray(new String[0]));
            //【点进去】注册 CommonFilter
            Filter filter = new CommonFilter();
            registration.setFilter(filter);
            registration.setOrder(filterConfig.getOrder());
            registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(properties.getHttpMethodSpecify()));
            log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
            return registration;
        }
    }
    

    1.4 CommonFilter 过滤器

    • CommonFilter 过滤器的作用与源码如下:
      • 从请求中获取目标 URL;
      • 获取 Urlcleaner;
      • 对当前 URL 添加限流埋点;
    public class CommonFilter implements Filter {
        
        //省略部分代码
    
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
            HttpServletRequest sRequest = (HttpServletRequest)request;
            Entry urlEntry = null;
            try {
                //解析请求 URL
                String target = FilterUtil.filterTarget(sRequest);
                //URL 清洗
                UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
                if (urlCleaner != null) {
                    //如果存在,则说明配置过 URL 清洗策略,替换配置的 targer
                    target = urlCleaner.clean(target);
                }
                if (!StringUtil.isEmpty(target)) {
                    String origin = this.parseOrigin(sRequest);
                    ContextUtil.enter("sentinel_web_servlet_context", origin);
                    if (this.httpMethodSpecify) {
                        String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + ":" + target;
                        //使用 SphU.entry() 方法对 URL 添加限流埋点
                        urlEntry = SphU.entry(pathWithHttpMethod, 1, EntryType.IN);
                    } else {
                        urlEntry = SphU.entry(target, 1, EntryType.IN);
                    }
                }
                //执行过滤
                chain.doFilter(request, response);
            } catch (BlockException var14) {
                HttpServletResponse sResponse = (HttpServletResponse)response;
                WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, var14);
            } catch (ServletException | RuntimeException | IOException var15) {
                Tracer.traceEntry(var15, urlEntry);
                throw var15;
            } finally {
                if (urlEntry != null) {
                    urlEntry.exit();
                }
                ContextUtil.exit();
            }
        }
    }
    

    1.5 小结

    • 对于 Web Servlet 环境,只是通过 Filter 的方式将所有请求自动设置为 Sentinel 的资源,从而达到限流的目的;

    2. 获取 ProcessorSlot 链

    • Sentinel 的工作原理主要依靠 ProcessorSlot 链,遍历链中的每一个 Slot 槽,执行相应逻辑;

    2.1 Sentinel 源码包结构

    • 在 DeBug 之前,我们需要对 Sentinel 的源码包结构做个分析,以找到方法的入口;
    模块名 说明
    sentinel-adapter 负责针对主流开源框架进行限流适配,如:Dubbo、gRPC、Zuul 等;
    sentinel-core Sentinel 核心库,提供限流、熔断等实现;
    sentinel-dashboard 控制台模块,提供可视化监控和管理;
    sentinel-demo 官方案例;
    sentinel-extension 实现不同组件的数据源扩展,如:Nacos、ZooKeeper、Apollo 等;
    sentinel-transport 通信协议处理模块;
    • Slot 槽是 Sentinel 的核心,因此方法的入口在 sentinel-core 核心库,里面有好多个 SphU.entry() 方法,我们给方法打上断点,DeBug 进入,然后登录 Sentinel 控制台;
    首次DeBug 进入 SphU.entry() 方法.png

    2.2 获取 ProcessorSlot 链与操作 Slot 槽的入口 CtSph.entryWithPriority()

    • 一直进入最终方法的实现在 CtSph.entryWithPriority() 方法里,其主要逻辑与源码如下:
      • 校验全局上下文 context;
      • 构造 ProcessorSlot 链;
      • 遍历 ProcessorSlot 链操作 Slot 槽(遍历链表);
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            //上下文量已经超过阈值 -> 只初始化条目,不进行规则检查
            return new CtEntry(resourceWrapper, null, context);
        }
    
        if (context == null) {
            //没有指定上下文 -> 使用默认上下文 context
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
         
         if (!Constants.ON) {
            //全局开关关闭 -> 没有规则检查
            return new CtEntry(resourceWrapper, null, context);
        }
        //【断点步入 2.2.1】通过 lookProcessChain 方法获取 ProcessorSlot 链
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
    
        if (chain == null) {
            //表示资源量超过 Constants.MAX_SLOT_CHAIN_SIZE 常量 -> 不会进行规则检查
            return new CtEntry(resourceWrapper, null, context);
        }
    
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            //【断点步入 3./4./5.】执行 ProcessorSlot 对 ProcessorSlot 链中的 Slot 槽遍历操作(遍历链表的方式)
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            //这种情况不应该发生,除非 Sentinel 内部存在错误
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
    

    2.2.1 构造 ProcessorSlot 链 CtSph.lookProcessChain()

    • 进入 CtSph.lookProcessChain() 方法;
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        //从缓存中获取 slot 调用链
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
                    //【断点步入】构造 Slot 链(责任链模式)
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
    
    • 最终调用 DefaultSlotChainBuilder.build() 方法构造 DefaultProcessorSlotChain;
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
        return chain;
    }
    
    • 可以看到最后 ProcessorSlotChain 链中有 10 个 Slot 插槽:
    • 在本篇笔记中我们关注 3 个槽:
      • FlowSlot:进行流控规则校验,对应本篇《3. 流控槽实施流控逻辑》;
      • StatisticSlot:实现指标数据的统计,对应本篇《4. 统计槽实施指标数据统计》;
      • DegradeSlot:服务熔断,对应本篇《5. 熔断槽实施服务熔断》
    ProcessorSlotChain 链中有 10 个 Slot 插槽.png

    2.2.2 操作 Slot 槽的入口

    • 操作 Slot 槽的入口方法是:ProcessorSlot.entry()
    • 接着会以遍历链表的方式操作每个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应下面的《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;

    3. 流控槽实施流控逻辑 FlowSlot.entry()

    • 进入 ProcessorSlot.entry() 方法,它会遍历每个 Slot 插槽,并对其进行操作,其中会经过 FlowSlot.entry() 方法(需要提前给该方法打上断点),方法的逻辑跟源码如下:
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        //【断点步入】检查流量规则
        checkFlow(resourceWrapper, context, node, count, prioritized);
        //调用下一个 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    • 进入 FlowSlot.checkFlow() 方法,最终调用 FlowRuleChecker.checkFlow() 方法,方法的逻辑和源码如下:
      • 遍历所有流控规则 FlowRule;
      • 针对每个规则调用 canPassCheck 进行校验;
    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        //【断点步入 3.1】获取流控规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            //遍历所有流控规则 FlowRule
            for (FlowRule rule : rules) {
                //【点进去 3.2】校验每条规则
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
    

    3.1 获取流控规则 FlowSlot.ruleProvider.apply()

    • 进入 FlowSlot.ruleProvider.apply() 方法,获取到 Sentinel 控制台上的流控规则;
    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
        @Override
        public Collection<FlowRule> apply(String resource) {
            // Flow rule map should not be null.
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };
    

    3.2 校验每条规则 FlowRuleChecker.canPassCheck()

    • 进入 FlowRuleChecker.canPassCheck() 方法,分集群和单机模式校验每条规则;
    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        //集群模式
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        //【点进去】单机模式
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }
    
    • 由于我们是单机模式,进入 FlowRuleChecker.passLocalCheck() 方法,其主要逻辑和源码如下:
      • 根据来源和策略获取 Node,从而拿到统计的 runtime 信息;
      • 使用流量控制器检查是否让流量通过;
    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        //【点进去 3.2.1】获取 Node
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        //【点进去 3.2.2】获取流控的处理策略
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
    

    3.2.1 获取 Node FlowRuleChecker.selectNodeByRequesterAndStrategy()

    • 进入 FlowRuleChecker.selectNodeByRequesterAndStrategy() 方法,其根据 FlowRule 中配置的 Strategy 和 limitApp 属性,返回不同处理策略的 Node;
    static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        //limitApp 不能为空
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
        
        //场景1:限流规则设置了具体应用,如果当前流量就是通过该应用的,则命中场景1
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }
            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
        //场景2:限流规则未指定任何具体应,默认为default,则当前流量直接命中场景2
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }
    
            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
        //场景3:限流规则设置的是other,当前流量未命中前两种场景
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }
            return selectReferenceNode(rule, context, node);
        }
        return null;
    }
    
    • 假设我们对接口 UserService 配置限流 1000 QPS,这 3 种场景分别如下:
      • 场景 1:目的是优先保障重要来源的流量。我们需要区分调用来源,将限流规则细化。对A应用配置500QPS,对B应用配置200QPS,此时会产生两条规则:A应用请求的流量限制在500,B应用请求的流量限制在200;
      • 场景 2:没有特别重要来源的流量。我们不想区分调用来源,所有入口调用 UserService 共享一个规则,所有 client 加起来总流量只能通过 1000 QPS;
      • 场景 3:配合第1种场景使用,在长尾应用多的情况下不想对每个应用进行设置,没有具体设置的应用都将命中;

    3.2.2 获取流控的处理策略 `FlowRule.getRater().canPass()

    • 进入 FlowRule.getRater().canPass() 方法,首先通过 FlowRule.getRater() 获得流控行为 TrafficShapingController,这是一个接口,有四种实现类,如下图所示:
    TrafficShapingController 的四种实现类.png
    • 有以下四种处理策略:
      • DefaultController:直接拒绝;
      • RateLimiterController:匀速排队;
      • WarmUpController:冷启动(预热);
      • WarmUpRateLimiterController:匀速+冷启动。
    • 最终调用 TrafficShapingController.canPass() 方法,执行流控行为;

    4. 统计槽实施指标数据统计 StatisticSlot.entry()

    • 限流的核心是限流算法的实现,Sentinel 默认采用滑动窗口算法来实现限流,具体的指标数据统计由 StatisticSlot 实现;
    • 我们给 StatisticSlot.entry() 方法里的语句打上断点,运行到光标处;
    • StatisticSlot.entry() 方法的核心是使用 Node 统计“增加线程数”和“请求通过数”;
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        try {
            //先执行后续 Slot 检查,再统计数据(即先调用后续所有 Slot)
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
    
            //【断点步入】使用 Node 统计“增加线程数”和“请求通过数”
            node.increaseThreadNum();
            node.addPassRequest(count);
    
            //如果存在来源节点,则对来源节点增加线程数和请求通过数
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
            
            //如果是入口流量,则对全局节点增加线程数和请求通过数
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }
    
            //执行事件通知和回调函数
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        //处理优先级等待异常    
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            //如果有来源节点,则对来源节点增加线程数
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
    
            //如果是入口流量,对全局节点增加线程数
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            //执行事件通知和回调函数
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        //处理限流、熔断等异常    
        } catch (BlockException e) {
            
            //省略
            
            throw e;
        //处理业务异常    
        } catch (Throwable e) {
            context.getCurEntry().setError(e);
            throw e;
        }
    }
    

    4.1 统计“增加线程数”和“请求通过数”

    • 这两个方法都是调用同一个类的,笔者以第一个为例,进入 DefaultNode.increaseThreadNum() 方法,最终调用的是 StatisticNode.increaseThreadNum(),而统计也是依靠 StatisticNode 维护的,这里放上 StatisticNode 的统计核心与源码:
      • StatisticNode 持有两个计数器 Metric 对象,统计行为是通过 Metric 完成的;
    public class StatisticNode implements Node {
    
        //省略其他代码
    
        //【断点步入】最近 1s 滑动窗口计数器(默认 1s)
        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
            IntervalProperty.INTERVAL);
    
        //最近 1min 滑动窗口计数器(默认 1min)
        private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
        
        //增加 “请求通过数” 
        @Override
        public void addPassRequest(int count) {
            rollingCounterInSecond.addPass(count);
            rollingCounterInMinute.addPass(count);
        }
        //增加 RT 和成功数
        @Override
        public void addRtAndSuccess(long rt, int successCount) {
            rollingCounterInSecond.addSuccess(successCount);
            rollingCounterInSecond.addRT(rt);
            rollingCounterInMinute.addSuccess(successCount);
            rollingCounterInMinute.addRT(rt);
        }
    
        //增加“线程数”
        @Override
        public void increaseThreadNum() {
            curThreadNum.increment();
        }
    }
    
    • 这里还有减少请求通过数(线程数)、统计最大值等方法,由于篇幅有限,这里不放出,感兴趣的读者可以自己 DeBug 进入看看;

    4.2 数据统计的数据结构

    4.2.1 ArrayMetric 指标数组

    • ArrayMetric 的构造方法需要先给方法打上断点,重新 DeBug,在初始化时注入构造;
    public class ArrayMetric implements Metric {
        
        //省略其他代码
    
        //【点进去 4.2.2】数据存储
        private final LeapArray<MetricBucket> data;
        
        //最近 1s 滑动计数器用的是 OccupiableBucketLeapArray
        public ArrayMetric(int sampleCount, int intervalInMs) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        }
        
        //最近 1min 滑动计数器用的是 BucketLeapArray
        public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
            if (enableOccupy) {
                this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
            } else {
                this.data = new BucketLeapArray(sampleCount, intervalInMs);
            }
        }
    
        //增加成功数
        @Override
        public void addSuccess(int count) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addSuccess(count);
        }
    
        //增加通过数
        @Override
        public void addPass(int count) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addPass(count);
        }
    
        //增加 RT
        @Override
        public void addRT(long rt) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addRT(rt);
        }
    }
    

    4.2.2 LeapArray 环形数组

    • LeapArray 是处理数据的核心数据结构,采用滑动窗口算法;
    • ArrayMetric 中持有 LeapArray 对象,所有方法都是对 LeapArray 进行操作;
    • LeapArray 是环形的数据结构,为了节约内存,它存储固定个数的窗口对象 WindowWrap,只保存最近一段时间的数据,新增的时间窗口会覆盖最早的时间窗口;
    public abstract class LeapArray<T> {
    
        //省略其他代码
    
        //单个窗口的长度(1个窗口多长时间)
        protected int windowLengthInMs;
        //采样窗口个数
        protected int sampleCount;
        //全部窗口的长度(全部窗口多长时间)
        protected int intervalInMs;
        private double intervalInSecond;
        //窗口数组:存储所有窗口(支持原子读取和写入)
        protected final AtomicReferenceArray<WindowWrap<T>> array;
        //更新窗口数据时用的锁
        private final ReentrantLock updateLock = new ReentrantLock();
    
        public LeapArray(int sampleCount, int intervalInMs) {
            //计算单个窗口的长度
            this.windowLengthInMs = intervalInMs / sampleCount;
            this.intervalInMs = intervalInMs;
            this.intervalInSecond = intervalInMs / 1000.0;
            this.sampleCount = sampleCount;
            this.array = new AtomicReferenceArray<>(sampleCount);
        }
        //【点进去 4.2.3】获取当前窗口
        public WindowWrap<T> currentWindow() {
            //这里参数是当前时间
            return currentWindow(TimeUtil.currentTimeMillis());
        }
        //获取指定时间的窗口
        public WindowWrap<T> currentWindow(long timeMillis) {
            if (timeMillis < 0) {
                return null;
            }
            // 计算数组下标
            int idx = calculateTimeIdx(timeMillis);
            //计算当前请求对应的窗口开始时间
            long windowStart = calculateWindowStart(timeMillis);
    
            /*
             * 从 array 中获取窗口。有 3 种情况:
             * (1) array 中窗口不在,创建一个 CAS 并写入 array;
             * (2) array 中窗口开始时间 = 当前窗口开始时间,直接返回;
             * (3) array 中窗口开始时间 < 当前窗口开始时间,表示 o1d 窗口已过期,重置窗口数据并返回;
             */
            while (true) {
                // 取窗口
                WindowWrap<T> old = array.get(idx);
                //(1)窗口不在
                if (old == null) {
                    //创建一个窗口
                    WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                    //CAS将窗口写进 array 中并返回(CAS 操作确保只初始化一次)
                    if (array.compareAndSet(idx, null, window)) {
                        return window;
                    } else {
                        //并发写失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候 array 中有数据了会命中第2种情况;
                        Thread.yield();
                    }
                //(2)array 中窗口开始时间 = 当前窗口开始时间
                } else if (windowStart == old.windowStart()) {
                    //直接返回
                    return old;
                //(3)array 中窗口开始时间 < 当前窗口开始时间    
                } else if (windowStart > old.windowStart()) {
                    //尝试获取更新锁
                    if (updateLock.tryLock()) {
                        try {
                            //拿到锁的线程才重置窗口
                            return resetWindowTo(old, windowStart);
                        } finally {
                            //释放锁
                            updateLock.unlock();
                        }
                    } else {
                        //并发加锁失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候因为 old 对象时间更新了会命中第 2 种情况;
                        Thread.yield();
                    }
                //理论上不会出现    
                } else if (windowStart < old.windowStart()) {
                    // 正常情况不会进入该分支(机器时钟回拨等异常情况)
                    return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                }
            }
        }
        //计算索引
        private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
            //timeId 降低时间精度
            long timeId = timeMillis / windowLengthInMs;
            //计算当前索引,这样我们就可以将时间戳映射到 leap 数组
            return (int)(timeId % array.length());
        }
        //计算窗口开始时间
        protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
            return timeMillis - timeMillis % windowLengthInMs;
        }
    }
    

    4.2.3 WindowWrap 窗口包装类

    • WindowWrap 是一个窗口对象,它是一个包装类,包装的对象是 MetricBucket
    public class WindowWrap<T> {
        //窗口长度,与 LeapArray 的 windowLengthInMs 一致
        private final long windowLengthInMs;
        //窗口开始时间,其值是 windowLengthInMs 的整数倍
        private long windowStart;
        //窗口的数据,支持 MetricBucket 类型,存储统计数据
        private T value;
    
        //省略其他代码
    }
    

    4.2.4 MetricBucket 指标桶

    • MetricBucket 类的定义如下,可以发现指标数据存在 LongAdder[] counters中;
    • LongAdder 是 JDK1.8 中新增的类,用于在高并发场景下代替AtomicLong,以用空间换时间的方式降低了 CAS 失败的概率,从而提高性能;
    public class MetricBucket {
        /**
         * 存储指标的计数器;
         * LongAdder 是线程安全的计数器
         * counters[0]  PASS 通过数;
         * counters[1]  BLOCK 拒绝数;
         * counters[2]  EXCEPTION 异常数;
         * counters[3]  SUCCESS 成功数;
         * counters[4]  RT 响应时长;
         * counters[5]  OCCUPIED_PASS 预分配通过数;
         **/
        private final LongAdder[] counters;
    
        //最小 RT,默认值是 5000ms
        private volatile long minRt;
    
        //构造中初始化
        public MetricBucket() {
            MetricEvent[] events = MetricEvent.values();
            this.counters = new LongAdder[events.length];
            for (MetricEvent event : events) {
                counters[event.ordinal()] = new LongAdder();
            }
            initMinRt();
        }
    
        //覆盖指标
        public MetricBucket reset(MetricBucket bucket) {
            for (MetricEvent event : MetricEvent.values()) {
                counters[event.ordinal()].reset();
                counters[event.ordinal()].add(bucket.get(event));
            }
            initMinRt();
            return this;
        }
    
        private void initMinRt() {
            this.minRt = SentinelConfig.statisticMaxRt();
        }
    
        //重置指标为0
        public MetricBucket reset() {
            for (MetricEvent event : MetricEvent.values()) {
                counters[event.ordinal()].reset();
            }
            initMinRt();
            return this;
        }
        //获取指标,从 counters 中返回
        public long get(MetricEvent event) {
            return counters[event.ordinal()].sum();
        }
        //添加指标
        public MetricBucket add(MetricEvent event, long n) {
            counters[event.ordinal()].add(n);
            return this;
        }
    
        public long pass() {
            return get(MetricEvent.PASS);
        }
    
        public long block() {
            return get(MetricEvent.BLOCK);
        }
    
        public void addPass(int n) {
            add(MetricEvent.PASS, n);
        }
    
        public void addBlock(int n) {
            add(MetricEvent.BLOCK, n);
        }
    
        //省略其他代码
    }
    

    4.2.5 各数据结构的依赖关系

    各数据结构的 UML 图 结构示意图.png

    4.2.6 LeapArray 统计数据的大致思路

    • 创建一个长度为 n 的数组,数组元素就是窗口,窗口包装了 1 个指标桶,桶中存放了该窗口时间范围中对应的请求统计数据;
    • 可以想象成一个环形数组在时间轴上向右滚动,请求到达时,会命中数组中的一个窗口,那么该请求的数据就会存到命中的这个窗口包含的指标桶中;
    • 当数组转满一圈时,会回到数组的开头,而此时下标为 0 的元素需要重复使用,它里面的窗口数据过期了,需要重置,然后再使用。具体过程如下图:


      LeapArray 统计数据的大致思路.png

    5. 熔断槽实施服务熔断 DegradeSlot.entry()

    • 服务熔断是通过 DegradeSlot 来实现的,它会根据用户配置的熔断规则和系统运行时各个 Node 中的统计数据进行熔断判断;
    • 注意:熔断功能在 Sentinel-1.8.0 版本前后有较大变化;
    • 我们给 DegradeSlot.entry() 方法里的语句打上断点,运行到光标处;
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        //【断点步入】熔断检查
        performChecking(context, resourceWrapper);
        //调用下一个 Slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    • 进入 DegradeSlot.performChecking() 方法,其逻辑与源码如下:
      • 根据资源名称获取断路器;
      • 循环判断每个断路器;
    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        //根据 resourceName 获取断路器
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        //循环判断每个断路器
        for (CircuitBreaker cb : circuitBreakers) {
            //【点进去】尝试通过断路器
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
    

    5.1 继续或取消熔断功能

    • 进入 AbstractCircuitBreaker.tryPass() 方法,当请求超时并且处于探测恢复(半开状态,HALF-OPEN 状态)失败时继续断路功能;
    @Override
    public boolean tryPass(Context context) {
        //当前断路器状态为关闭
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        if (currentState.get() == State.OPEN) {
            //【点进去】对于半开状态,我们尝试通过
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }
    
    • 进入 AbstractCircuitBreaker.fromOpenToHalfOpen() 方法,实现状态的变更;
    protected boolean fromOpenToHalfOpen(Context context) {
        //尝试将状态从 OPEN 设置为 HALF_OPEN
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
            //状态变化通知
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            Entry entry = context.getCurEntry();
            //在 entry 添加一个 exitHandler  entry.exit() 时会调用
            entry.whenTerminate(new BiConsumer<Context, Entry>() {
                @Override
                public void accept(Context context, Entry entry) {
                    //如果有发生异常,重新将状态设置为OPEN 请求不同通过
                    if (entry.getBlockError() != null) {
                        currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                        notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                    }
                }
            });
            //此时状态已设置为HALF_OPEN正常通行
            return true;
        }
        //熔断
        return false;
    }
    
    • 上述讲解了:状态从 OPEN 变为 HALF_OPEN,HALF_OPEN 变为 OPEN;
    • 但状态从 HALF_OPEN 变为 CLOSE 需要在正常执行完请求后,由 entry.exit() 调用 DegradeSlot.exit() 方法来改变状态;

    5.2 请求失败,启动熔断

    • 状态从 HALF_OPEN 变为 CLOSE 的实现方法在 DegradeSlot.exit()
    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        //无阻塞异常
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }
        //通过资源名获取断路器
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        //没有配置断路器,则直接放行
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }
    
        if (curEntry.getBlockError() == null) {
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                //【点进去】在请求完成时
                circuitBreaker.onRequestComplete(context);
            }
        }
        fireExit(context, r, count, args);
    }
    
    • 进入 ExceptionCircuitBreaker.onRequestComplete() 方法,其主要逻辑与源码如下:
      • 请求失败比例与总请求比例加 1,用于判断后续是否超过阈值;
    @Override
    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        Throwable error = entry.getError();
        //简单错误计数器
        SimpleErrorCounter counter = stat.currentWindow().value();
        if (error != null) {
            //异常请求数加 1
            counter.getErrorCount().add(1);
        }
        //总请求数加 1
        counter.getTotalCount().add(1);
        //【点进去】超过阈值时变更状态
        handleStateChangeWhenThresholdExceeded(error);
    }
    
    • 进入 ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded() 方法,变更状态;
    private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        //全开则直接放行
        if (currentState.get() == State.OPEN) {
            return;
        }
        //半开状态
        if (currentState.get() == State.HALF_OPEN) {
            //检查请求
            if (error == null) {
                //发生异常,将状态从半开 HALF_OPEN 转为关闭 CLOSE
                fromHalfOpenToClose();
            } else {
                //无异常,解开半开状态
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
        
        //计算是否超过阈值
        List<SimpleErrorCounter> counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            errCount += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        if (totalCount < minRequestAmount) {
            return;
        }
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            //熔断策略为:异常比例
            curCount = errCount * 1.0d / totalCount;
        }
        if (curCount > threshold) {
            transformToOpen(curCount);
        }
    }
    

    6. Sentinel 源码结构图小结

    • SphU.entry():核心逻辑的入口函数;
      • CtSph.entryWithPriority():获取 Slot 链,操作 Slot 槽;
        • CtSph.lookProcessChain():获取 ProcessorSlot 链;
          • DefaultSlotChainBuilder.build():构造 DefaultProcessorSlotChain 链(里面有 10 个 Slot 插槽);
        • ProcessorSlot.entry():遍历 ProcessorSlot 链;
          • FlowSlot.entry():遍历到 FlowSlot 槽,限流规则;

            • FlowSlot.checkFlow():检查流量规则;
              • FlowRuleChecker.checkFlow():使用检查器检查流量规则;
                • FlowSlot.ruleProvider.apply():获取流控规则;
                • FlowRuleChecker.canPassCheck():校验每条规则;
                  • FlowRuleChecker.passClusterCheck():集群模式;
                  • FlowRuleChecker.passLocalCheck():单机模式;
                    • FlowRuleChecker.selectNodeByRequesterAndStrategy():获取 Node;
                    • FlowRule.getRater():获得流控行为 TrafficShapingController;
                    • TrafficShapingController.canPass():执行流控行为;
          • StatisticSlot.entry:遍历到 StatisticSlot 槽,统计数据;

            • DefaultNode.increaseThreadNum():统计“增加线程数”;
              • StatisticNode.increaseThreadNum():统计“请求通过数”;
                • ArrayMetric.ArrayMetric():初始化指标数组;
                  • LeapArray:环形数组;
                    • WindowWrap:窗口包装类;
                  • MetricBucket:指标桶;
            • DefaultNode.addPassRequest():统计“增加线程数”;
              • StatisticNode.addPassRequest():同上;
          • DegradeSlot.entry():遍历到 DegradeSlot 槽,服务熔断;

            • DegradeSlot.performChecking():执行检查;
              • DegradeRuleManager.getCircuitBreakers():根据 resourceName 获取断路器;
              • AbstractCircuitBreaker.tryPass():继续或取消熔断功能;
                • AbstractCircuitBreaker.fromOpenToHalfOpen():尝试通过半开状态;
          • DegradeSlot.exit():请求失败(超时),启动熔断;

            • ExceptionCircuitBreaker.onRequestComplete():在请求完成时操作;
              • ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded():变更状态;

    最后

    \color{blue}{\rm\small{新人制作,如有错误,欢迎指出,感激不尽!}}

    \color{blue}{\rm\small{欢迎关注我,并与我交流!}}

    \color{blue}{\rm\small{如需转载,请标注出处!}}

    相关文章

      网友评论

        本文标题:微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码

        本文链接:https://www.haomeiwen.com/subject/jghhkrtx.html