美文网首页
Spring Cloud Alibaba——Sentinel S

Spring Cloud Alibaba——Sentinel S

作者: 小波同学 | 来源:发表于2021-08-03 01:37 被阅读0次

    slot概述

    在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

    • NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。

    • ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT、QPS、thread count 等等,这些信息将用作为多维度限流,降级的依据。

    • LogSlot:则用于记录用于记录块异常,为故障排除提供具体的日志。

    • StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。

    • AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。

    • SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。

    • FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。

    • DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。

    下面是关系结构图:


    solt的基本逻辑及代码演示

    每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了sentinel的责任链。

    工作流概述:


    根据slot 的基本实现processorSlot的实现类讲一下slot 的基本结构

    先看看顶层接口ProcessorSlot

    public interface ProcessorSlot<T> {
    
        //开始入口
        void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
                   Object... args) throws Throwable;
    
        //finish意味着结束
        void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                       Object... args) throws Throwable;
    
        //退出插槽
        void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
    
        //退出插槽结束
        void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
    }
    

    这个接口有4个方法,entry,fireEntry,exit,fireExit

    ProcessorSlot 的抽象实现 AbstractLinkedProcessorSlot

    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
        private AbstractLinkedProcessorSlot<?> next = null;
    
        @Override
        public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            //当业务执行完毕后,如果还有下一个slot
            if (next != null) {
                next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
            }
        }
    
        @SuppressWarnings("unchecked")
        //指向下一个slot的entry,每一个slot根据自己的职责不同,有自己的实现
        void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
            throws Throwable {
            T t = (T)o;
            entry(context, resourceWrapper, t, count, prioritized, args);
        }
    
        @Override
        public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            //当一个slot的exit执行完毕后,如果还有下一个未关闭slot
            if (next != null) {
                 //指向下一个slot的exit
                next.exit(context, resourceWrapper, count, args);
            }
        }
    
        public AbstractLinkedProcessorSlot<?> getNext() {
            return next;
        }
    
        public void setNext(AbstractLinkedProcessorSlot<?> next) {
            this.next = next;
        }
    
    }
    

    DefaultProcessorSlotChain实现了上述的chain(setNext和getNext)

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        //直接实现了AbstractLinkedProcessorSlot的实例并作为first,可以理解为当前slot
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
        };
        
        //默认的end(可以理解为当前的后一个slot)
        AbstractLinkedProcessorSlot<?> end;
    
        public DefaultProcessorSlotChain() {
            this.end = this.first;
        }
    
        public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            protocolProcessor.setNext(this.first.getNext());
            this.first.setNext(protocolProcessor);
            //如果当前为最后一个
            if (this.end == this.first) {
                this.end = protocolProcessor;
            }
    
        }
    
        public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            //将后一个slot放进当前slot的next
            this.end.setNext(protocolProcessor);
            //将end指向后一个slot
            this.end = protocolProcessor;
        }
    
        public void setNext(AbstractLinkedProcessorSlot<?> next) {
            this.addLast(next);
        }
    
        public AbstractLinkedProcessorSlot<?> getNext() {
            return this.first.getNext();
        }
    
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
            this.first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
        }
    
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            this.first.exit(context, resourceWrapper, count, args);
        }
    }
    

    NodeSelectorSlot

    @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
    public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    
        /**
         * 相同的资源但是Context不同,分别新建 DefaultNode,并以ContextName为key
         */
        private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
        
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            // 根据ContextName尝试获取DefaultNode
            DefaultNode node = map.get(context.getName());
            if (node == null) {
                synchronized (this) {
                    node = map.get(context.getName());
                    if (node == null) {
                        // 初始化resource对应的Node 类型为DefaultNode,每个Context对应一个
                        node = new DefaultNode(resourceWrapper, null);
                        //保存Node  一个resource对应一个Node
                        HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                        cacheMap.putAll(map);
                        cacheMap.put(context.getName(), node);
                        map = cacheMap;
                        // 构建 Node tree
                        // 添加到Context的Node节点,这里构造了一棵节点树
                        ((DefaultNode) context.getLastNode()).addChild(node);
                    }
    
                }
            }
            //设置当前Context的当前节点为node
            context.setCurNode(node);
            // 唤醒执行下一个插槽,调用下一个Slot
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            fireExit(context, resourceWrapper, count, args);
        }
    }
    

    NodeSelectorSlot顾名思义是用来构建Node的。

    我们可以看到NodeSelectorSlot对于不同的上下文都会生成一个DefaultNode。这里还有一个要注意的点:相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中,因此不同的上下文可以进入到同一个对象的NodeSelectorSlot.entry方法中,那么这里要怎么区分不同的上下文所创建的资源Node呢?显然可以使用上下文名称作为映射键以区分相同的资源Node。

    然后这里要考虑另一个问题。一个资源有可能创建多个DefaultNode(有多个上下文时),那么我们应该如何快速的获取总的统计数据呢?

    答案就在下一个Slot(ClusterBuilderSlot)中被解决了。

    ClusterBuilderSlot

    上面有提到一个问题,我们要如何统计不同上下文相同资源的总量数据。ClusterBuilderSlot给了很好的解决方案:具有相同资源名称的共享一个ClusterNode。

    @Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
    public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        // 相同的资源共享一个 ClusterNode
        private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
    
        private static final Object lock = new Object();
    
        private volatile ClusterNode clusterNode = null;
        
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args)
            throws Throwable {
            // 判断本资源是否已经初始化过clusterNode
            if (clusterNode == null) {
                synchronized (lock) {
                    if (clusterNode == null) {
                        // 没有初始化则初始化clusterNode
                        clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                        HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                        newMap.putAll(clusterNodeMap);
                        newMap.put(node.getId(), clusterNode);
    
                        clusterNodeMap = newMap;
                    }
                }
            }
            // 给相同资源的DefaultNode设置一样的ClusterNode
            node.setClusterNode(clusterNode);
    
            /*
             *  如果有来源则新建一个来源Node
             */
            if (!"".equals(context.getOrigin())) {
                Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
                context.getCurEntry().setOriginNode(originNode);
            }
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    }   
    

    上面的代码其实就是做了一件事情,为资源创建CluserNode,相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。也就是说,能进入到同一个ClusterBuilderSlot对象的entry方法的请求都是来自同一个资源的,所以这些相同资源需要初始化一个统一的CluserNode用来做流量的汇总统计。

    LogSlot

    代码比较简单,逻辑就是打印异常日志,就不分析了。

    StatisticSlot

    StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。

    @Spi(order = Constants.ORDER_STATISTIC_SLOT)
    public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            try {
                // Do some checking.
                //next(下一个)节点调用Entry方法
                //先进行后续的check,包括规则的check,黑白名单check
                fireEntry(context, resourceWrapper, node, count, prioritized, args);
                // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
    
                // 统计默认qps 线程数, 当前线程数加1
                node.increaseThreadNum();
                //通过的请求加上count
                node.addPassRequest(count);
    
                // 元节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1
                if (context.getCurEntry().getOriginNode() != null) {
                    // 根据来源统计qps 线程数
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                    context.getCurEntry().getOriginNode().addPassRequest(count);
                }
                
                // 入口节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // 统计入口 qps 线程数
                    Constants.ENTRY_NODE.increaseThreadNum();
                    Constants.ENTRY_NODE.addPassRequest(count);
                }
    
                // Handle pass event with registered entry callback handlers.
                // 注册的扩展点的数据统计
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (PriorityWaitException ex) {
                node.increaseThreadNum();
                if (context.getCurEntry().getOriginNode() != null) {
                    // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                }
                // Handle pass event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (BlockException e) {
                // Blocked, set block exception to current entry.
                context.getCurEntry().setBlockError(e);
    
                // Add block count.
                node.increaseBlockQps(count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseBlockQps(count);
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseBlockQps(count);
                }
    
                // Handle block event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onBlocked(e, context, resourceWrapper, node, count, args);
                }
    
                throw e;
            } catch (Throwable e) {
                // Unexpected internal error, set error to current entry.
                context.getCurEntry().setError(e);
    
                throw e;
            }
        }
    }
    

    StatisticSlot主要做了4种不同维度的流量统计

    • 1、资源在上下文维度(DefaultNode)的统计。
    • 2、clusterNode 维度的统计。
    • 3、Origin 来源维度的统计。
    • 4、入口全局流量的统计。

    SystemSlot

    SystemSlot比较简单,其实就是根据StatisticSlot所统计的全局入口流量进行限流。

    AuthoritySlot

    @Spi(order = Constants.ORDER_AUTHORITY_SLOT)
    public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
            throws Throwable {
            checkBlackWhiteAuthority(resourceWrapper, context);
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
        
        void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
            Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
    
            if (authorityRules == null) {
                return;
            }
            
            // 根据资源获取黑白名单规则
            Set<AuthorityRule> rules = authorityRules.get(resource.getName());
            if (rules == null) {
                return;
            }
    
            // 对规则进行校验,只要有一条不通过 就抛异常
            for (AuthorityRule rule : rules) {
                if (!AuthorityRuleChecker.passCheck(rule, context)) {
                    throw new AuthorityException(context.getOrigin(), rule);
                }
            }
        }
    }   
    

    AuthoritySlot会对资源的黑白名单做检查,并且只要有一条不通过就抛异常。

    FlowSlot

    @Spi(order = Constants.ORDER_FLOW_SLOT)
    public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        private final FlowRuleChecker checker;
        
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            checkFlow(resourceWrapper, context, node, count, prioritized);
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
            throws BlockException {
            // 检查限流规则
            checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
        }
    }   
    

    这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止,并且同样也会根据三种不同的维度来进行限流:

    • 1、资源在上下文维度(DefaultNode)的统计。
    • 2、clusterNode 维度的统计。
    • 3、Origin 来源维度的统计。

    DegradeSlot

    这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

    总结

    • 1、相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。

    • 2、流控有多个维度,分别包括:

      • 1、不同上下文中的资源。
      • 2、相同资源。
      • 3、入口流量。
      • 4、相同的来源。

    Sentinel中预设的SlotChain执行的完整流程:

    参考:
    https://www.cnblogs.com/taromilk/p/11751000.html

    https://juejin.cn/post/6870671845582962702

    https://blog.csdn.net/wk52525/article/details/104439404

    相关文章

      网友评论

          本文标题:Spring Cloud Alibaba——Sentinel S

          本文链接:https://www.haomeiwen.com/subject/qwssvltx.html