美文网首页
Spring Cloud Alibaba——Sentinel S

Spring Cloud Alibaba——Sentinel S

作者: 小波同学 | 来源:发表于2021-08-02 02:32 被阅读0次

    前言

    Sentinel作为ali开源的一款轻量级流控框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。相比于Hystrix,Sentinel的设计更加简单,在 Sentinel中资源定义和规则配置是分离的,也就是说用户可以先通过Sentinel API给对应的业务逻辑定义资源(埋点),然后在需要的时候再配置规则,通过这种组合方式,极大的增加了Sentinel流控的灵活性。

    引入Sentinel带来的性能损耗非常小。只有在业务单机量级超过25W QPS的时候才会有一些显著的影响(5% - 10% 左右),单机QPS不太大的时候损耗几乎可以忽略不计。

    Sentinel提供两种埋点方式:

    • try-catch 方式(通过 SphU.entry(...)),用户在 catch 块中执行异常处理 / fallback

    • if-else 方式(通过 SphO.entry(...)),当返回 false 时执行异常处理 / fallback

    写在前面

    在此之前,需要先了解一下Sentinel的工作流程。

    在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如默认情况下会创建以下7个插槽:

    • NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

    • ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据。

    • StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。

    • SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。

    • AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。

    • FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。

    • DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。

    注意:这里的插槽链都是一一对应资源名称的。

    每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了Sentinel的责任链。

    上面的所介绍的插槽(slot chain)是Sentinel非常重要的概念。同时还有一个非常重要的概念那就是Node。

    Node之间的树形结构

    在创建context会先创建DefaultNode 实际是它的父类EntranceNode,context可以相同context-name反复申明创建,但是DefaultNode同一context-name只会创建一次,DefaultNode包含了一个链路所有的资源,每一个资源对应一个ClusterNode,ClusterNode再根据来源细分为StatisticNode,它们之间的关系就是一个树形结构 如下:

    • EntranceNode:根据context-name来创建,就算同一个context-name多次创建context,entranceNode也只会创建一次, 用来统计该链路上所有的资源信息。

    • DefaultNode:根据context-name + resource-name创建,用来统计某链路上的资源信息。

    • ClusterNode:根据resource-name来创建,用来统计资源信息。

    • StatisticsNode:根据origin-name+resource-name来创建,针对请求来源统计该来源的资源信息,上面几个node都是它的子类,基于它的数据做汇总。

    一定要搞清楚这几个node之间的关系和作用,下面重点来看StatisticsNode,它用来完成信息统计,以供后续的限流规则使用, 它只统计了两个维度数据,qps和线程数。

    入门案例

    • 引入maven依赖
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        </dependency>
    </dependencies>
    
    <dependencyManagement>
        <dependencies>
            <!--整合Spring Cloud Alibaba-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2.2.6.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement> 
    
    • 下面举一个最简单的案例埋点来引出流控入口
    public String getOrderInfo(String orderNo) {
        ContextUtil.enter("getOrderInfo", "application-a");
        Entry entry = null;
        try {
            // name:资源名 EntryType 流量类型为入口还是出口,系统规则只针对入口流量, batchCount:当前请求流量, args:参数
            entry = SphU.entry("getOrderInfo", EntryType.IN, 1, orderNo);
            getUserInfo();
        } catch (BlockException e) {
            e.printStackTrace();
        } finally {
            entry.exit();
        }
        return "orderInfo = " + orderNo;
    }
    
    public String getUserInfo() {
        Entry entry = null;
        try {
            entry = SphU.entry("getUserInfo", EntryType.OUT, 1);
            // 通过http或feign对用户服务完成该接口调用
        } catch (BlockException e) {
            e.printStackTrace();
        } finally {
            entry.exit();
        }
        return "userInfo";
    }
    
    public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException 
    

    也可以通过注解的方式引入,执行方法时SentinelResourceAspect会做拦截进行流控处理,当然什么都不配也是可以的,因为引入spring-cloud-starter-alibaba-sentinel包spring mvc和spring webflux做了适配,自动会对每一个请求做埋点

    @GetMapping("getOrderInfo")
    @SentinelResource(value = "/getOrderInfo", entryType = EntryType.IN)
    public String getOrderInfo(@RequestParam("orderNo") String orderNo) {
        return "orderInfo = " + orderNo;
    }
    

    ContextUtil.enter("getOrderInfo", "application-a") 来表示调用链的入口,可以暂时理解为上下文,一般不做声明后面会默认创建。

    第一个参数为context-name,区分不同的调用链入口,默认常量值sentinel_default_context。

    第二参数为调用来源,这个参数可以细分从不同应用来源发出的请求,授权规则白名单和黑名单会根据该参数做限制,然后通过SphU.entry()埋点进入,下面说下这个方法几个参数的含义:

    • name:当前资源名。
    • trafficType:流量类型 分别为入口流量和出口流量。入口流量和出口流量执行过程都是差不多,只是入口流量会多了一个系统规则拦截,像是上面案例从订单服务调用getUserInfo,getUserInfo是去调用用户服务,它的流量方式是出去的,压力都在用户服务那边,不用考虑当前服务器的压力,所以标为出口流量。
    • batchCount:当前流量数量,一般默认为1。
    • args:参数,后面做热点参数规则时用到。

    BlockException:当某一规则不通过时会抛出对应异常。

    SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,如有嵌套像上面,需先退出getUserInfo的entry在退出getOrderInfo的entry

    打开打控制台,此时应该是空白的,sentinel控制台是懒加载模式,需要调用一下相关资源接口就可以看到


    可以看到sentinel规则配置主要有流控规则,降级规则,热点规则,系统规则,授权规则,先简单介绍下规则作用,其它配置也很简单 一目了然,后面通过结合源码来深入分析:

    • 流控规则:针对资源流量控制。
    • 热点规则:针对资源的热点参数做流量控制。
    • 降级规则:针对资源的调度情况来做降级处理。
    • 系统规则:针对当前服务做全局流量控制。
    • 授权规则:对访问资源的特定应用做授权处理。

    源码分析

    从上面的Sentinel使用的示例代码,我们就从这里切入开始分析

    ContextUtil.enter()

    public class ContextUtil {
    
        /**
         * Store the context in ThreadLocal for easy access.
         */
        private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
    
        /**
         * Holds all {@link EntranceNode}. Each {@link EntranceNode} is associated with a distinct context name.
         */
        private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
    
        public static Context enter(String name, String origin) {
            // 判断上下文名称是否为默认的名称(sentinel_default_context) 是的话直接抛出异常
            if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
                throw new ContextNameDefineException(
                    "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
            }
            return trueEnter(name, origin);
        }
        
        protected static Context trueEnter(String name, String origin) {
            // 先从ThreadLocal中尝试获取,获取到则直接返回
            Context context = contextHolder.get();
            if (context == null) {
                Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
                // 尝试从缓存中获取该上下文名称对应的 入口节点
                DefaultNode node = localCacheNameMap.get(name);
                if (node == null) {
                    // 判断缓存中入口节点数量是否大于2000
                    if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                        setNullContext();
                        return NULL_CONTEXT;
                    } else {
                        try {
                            // 加锁
                            LOCK.lock();
                            node = contextNameNodeMap.get(name);
                            // 双重检查锁
                            if (node == null) {
                                // 判断缓存中入口节点数量是否大于2000
                                if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                    setNullContext();
                                    return NULL_CONTEXT;
                                } else {
                                    // 根据上下文名称生成入口节点(entranceNode)
                                    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                    // Add entrance node.
                                    // 加入至全局根节点下
                                    Constants.ROOT.addChild(node);
                                    // 加入缓存中
                                    Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                    newMap.putAll(contextNameNodeMap);
                                    newMap.put(name, node);
                                    contextNameNodeMap = newMap;
                                }
                            }
                        } finally {
                            LOCK.unlock();
                        }
                    }
                }
                // 初始化上下文对象
                context = new Context(node, name);
                context.setOrigin(origin);
                // 设置到当前线程中
                contextHolder.set(context);
            }
    
            return context;
        }   
    }
    

    主要做了2件事情:

    • 1、根据ContextName生成entranceNode,并加入缓存,每个ContextName对应一个入口节点entranceNode。

    • 2、根据ContextName和entranceNode初始化上下文对象,并将上下文对象设置到当前线程中。

    这里有几点需要注意:

    • 1、入口节点数量不能大于2000,大于会直接抛异常。
    • 2、每个ContextName对应一个入口节点entranceNode。
    • 3、每个entranceNode都有共同的父节点。也就是根节点。

    SphU.entry(xxx)执行过程分析

    public class SphU {
    
        private static final Object[] OBJECTS0 = new Object[0];
        
        public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args)
            throws BlockException {
            // 默认为 出口流量类型,单位统计数为1
            return Env.sph.entry(name, trafficType, batchCount, args);
        }   
    }
    
    public class CtSph implements Sph {
    
        @Override
        public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
            // 生成资源对象
            StringResourceWrapper resource = new StringResourceWrapper(name, type);
            return entry(resource, count, args);
        }
        
        public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
            return entryWithPriority(resourceWrapper, count, false, args);
        }   
    }
    

    上面的代码比较简单,不指定EntryType的话,则默认为出口流量类型,最终会调用entryWithPriority方法,主要业务逻辑也都在这个方法中

    entryWithPriority方法

    public class CtSph implements Sph {
    
        private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
            // 获取当前线程上下文对象
            Context context = ContextUtil.getContext();
            // 上下文名称对应的入口节点是否已经超过阈值2000,超过则会返回空 CtEntry
            if (context instanceof NullContext) {
                return new CtEntry(resourceWrapper, null, context);
            }
    
            if (context == null) {
                // 如果没有指定上下文名称,则使用默认名称,也就是默认入口节点
                context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
            }
    
            // 全局开关
            if (!Constants.ON) {
                return new CtEntry(resourceWrapper, null, context);
            }
            
            // 生成插槽链
            ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
            /*
             * 表示资源(插槽链)超过6000,因此不会进行规则检查。
             */
            if (chain == null) {
                return new CtEntry(resourceWrapper, null, context);
            }
            
             // 生成 Entry 对象
             // 创建一个流量入口,将context curEntry进行指定
            Entry e = new CtEntry(resourceWrapper, chain, context);
            try {
                // 开始执行插槽链 调用逻辑
                chain.entry(context, resourceWrapper, null, count, prioritized, args);
            } catch (BlockException e1) {
                // 发生流控异常进行退出 清除上下文
                e.exit(count, args);
                // 将异常向上抛
                throw e1;
            } catch (Throwable e1) {
                // 除非Sentinel内部存在错误,否则不应发生这种情况。
                RecordLog.info("Sentinel unexpected exception", e1);
            }
            return e;
        }   
    }
    

    这个方法可以说是涵盖了整个Sentinel的核心逻辑

    • 1、获取上下文对象,如果上下文对象还未初始化,则使用默认名称初始化。初始化逻辑在上文已经分析过。
    • 2、判断全局开关。
    • 3、根据给定的资源生成插槽链,插槽链是跟资源相关的,Sentinel最关键的逻辑也都在各个插槽中。初始化的逻辑在lookProcessChain(resourceWrapper);中,下面会分析。
    • 4、依顺序执行每个插槽逻辑。

    lookProcessChain(resourceWrapper)方法

    lookProcessChain方法为指定资源生成插槽链,下面我们来看下它的初始化逻辑

    public class CtSph implements Sph {
    
        private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
            = new HashMap<ResourceWrapper, ProcessorSlotChain>();
    
        ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
            // 根据资源尝试从全局缓存中获取
            ProcessorSlotChain chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // 非常常见的双重检查锁
                synchronized (LOCK) {
                    chain = chainMap.get(resourceWrapper);
                    if (chain == null) {
                        // 判断资源数是否大于6000
                        if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                            return null;
                        }
                        // 初始化插槽链
                        chain = SlotChainProvider.newSlotChain();
                        Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                            chainMap.size() + 1);
                        newMap.putAll(chainMap);
                        newMap.put(resourceWrapper, chain);
                        chainMap = newMap;
                    }
                }
            }
            return chain;
        }
    }
    
    • 1、根据资源尝试从全局缓存中获取插槽链。每个资源对应一个插槽链(资源最多只能定义6000个)

    • 2、初始化插槽链上的插槽(SlotChainProvider.newSlotChain()方法中)

    下面我们看下初始化插槽链上的插槽的逻辑

    SlotChainProvider.newSlotChain()

    public final class SlotChainProvider {
    
        private static volatile SlotChainBuilder slotChainBuilder = null;
        
        public static ProcessorSlotChain newSlotChain() {
            // 判断是否已经初始化过
            if (slotChainBuilder != null) {
                return slotChainBuilder.build();
            }
    
            // Resolve the slot chain builder SPI.
            // SPI获取构造器
            slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
            // 加载失败则使用默认 插槽链 
            if (slotChainBuilder == null) {
                // Should not go through here.
                RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
                slotChainBuilder = new DefaultSlotChainBuilder();
            } else {
                RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                    slotChainBuilder.getClass().getCanonicalName());
            }
            // 构建完成
            return slotChainBuilder.build();
        }
    }
    
    public final class SpiLoader<S> {
    
        private static final String SPI_FILE_PREFIX = "META-INF/services/";
        
        //初始化spiLoader类加载器
        public static <T> SpiLoader<T> of(Class<T> service) {
            AssertUtil.notNull(service, "SPI class cannot be null");
            AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()),
                    "SPI class[" + service.getName() + "] must be interface or abstract class");
    
            String className = service.getName();
            SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);
            if (spiLoader == null) {
                synchronized (SpiLoader.class) {
                    spiLoader = SPI_LOADER_MAP.get(className);
                    if (spiLoader == null) {
                        SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));
                        spiLoader = SPI_LOADER_MAP.get(className);
                    }
                }
            }
    
            return spiLoader;
        }   
        
        public S loadFirstInstanceOrDefault() {
            //通过spiLoader去 META-INF/services/目录下去加载文件
            load();
    
            for (Class<? extends S> clazz : classList) {
                if (defaultClass == null || clazz != defaultClass) {
                    return createInstance(clazz);
                }
            }
    
            return loadDefaultInstance();
        }
    }
    

    SPI获取构造器——spiLoader类加载器,通过spiLoader去 META-INF/services/目录下去加载文件DefaultSlotChainBuilder。

    下面是DefaultSlotChainBuilder的build方法:

    @Spi(isDefault = true)
    public class DefaultSlotChainBuilder implements SlotChainBuilder {
    
        @Override
        public ProcessorSlotChain build() {
            // 先创建一个default作为header
            // chain是一个单向链表 first -> next -> end    
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
            
            // SPI机制——即SpiLoader在META-INF/services/目录下去加载文件 找到所有slot的实现,并根据@SpiOrder注解排序
            List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
            for (ProcessorSlot slot : sortedSlotList) {
                // 剔除掉Abstract类
                if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                    RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                    continue;
                }
                //addLast方法往链表的最后追加Slot对象。
                chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
            }
    
            return chain;
        }
    }
    

    首先初始化一个DefaultProcessorSlotChain,DefaultProcessorSlotChain里面维护了一个Slot的单向链表。

    然后利用Java SPI机制——即SpiLoader在META-INF/services/目录下去加载文件,找到所有slot的实现,并根据@SpiOrder注解排序, 最后调用addLast方法往链表的最后追加Slot对象。

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
                throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
    
        };
        AbstractLinkedProcessorSlot<?> end = first;
    
        @Override
        public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            end.setNext(protocolProcessor);
            end = protocolProcessor;
        }
    }
    
    
    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
        
        public void setNext(AbstractLinkedProcessorSlot<?> next) {
            this.next = next;
        }
    }
    

    至此SlotChain构造完成。

    执行SlotChain链

    SlotChain链执行的入口是 chain.entry(context, resourceWrapper, null, count, prioritized, args);这行

    com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
        }
    }
    

    就是执行first的transformEntry

    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
        void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
            throws Throwable {
            T t = (T)o;
            entry(context, resourceWrapper, t, count, prioritized, args);
        }
    }
    

    first的entry方法

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
        };
    }
    

    父类的fireEntry

    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
        private AbstractLinkedProcessorSlot<?> next = null;
    
        @Override
        public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            if (next != null) {
                next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
            }
        }
    }
    

    可以看到,实际上就是执行了next的transformEntry,而next的transformEntry又会调用它的entry方法,entry方法由每个Slot自己实现,只要在里面调用fireEntry,即触发next的next的entry调用。

    这样继续下去,直到next为空

    简单画个图描述一下这个过程:


    exit()方法和entry方法类似,这里就不分析了。

    至此SlotChain调用链分析完了,总结一下

    • 1、Chain中维护了一个单向链表
    • 2、通过fileEntry触发next的entry调用。
    • 3、责任链模式

    Sentinel中预设的SlotChain执行的完整流程:

    参考:
    https://www.cnblogs.com/taromilk/p/11750962.html

    https://www.cnblogs.com/zzz-blogs/p/14342608.html

    https://blog.csdn.net/qq_19414183/article/details/111035989

    https://blog.csdn.net/wk52525/article/details/104439404

    相关文章

      网友评论

          本文标题:Spring Cloud Alibaba——Sentinel S

          本文链接:https://www.haomeiwen.com/subject/ycdrvltx.html