美文网首页Sentinel
Sentinel流程介绍

Sentinel流程介绍

作者: 晴天哥_王志 | 来源:发表于2021-06-12 23:34 被阅读0次

    系列

    开篇

    Sentinel是2018年由阿里贡献的开源项目,是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。

    整个Sentinel系列的介绍会按照一个系列进行介绍,中间穿插一些滑动窗口等实现的介绍。整体的大纲如下:

    • Sentinel整体流程
    • Sentinel单元介绍
    • Sentinel规则
    • Sentinel的熔断处理
    • Sentinel的数据收集
    • Sentinel的dashboard

    本篇主要是介绍Sentinel整体流程,聚焦于讲解整个执行过程以及核心对象的存储方式。

    基础概念

    • ProcessorSlotChain:Sentinel 的核心骨架,将不同的Slot按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。

    • Context:Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。Context 维持着入口节点(entranceNode)、本次调用链路的 curNode、调用来源(origin)等信息。Context 名称即为调用链路入口名称。

    • Entry:每一次资源调用都会创建一个 Entry。Entry 包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。
      CtEntry 为普通的 Entry,在调用 SphU.entry(xxx) 的时候创建。特性:Linked entry within current context(内部维护着 parent 和 child)

    • Node:Sentinel 里面的各种种类的统计节点:
      StatisticNode:最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。
      DefaultNode:链路节点,用于统计调用链路上某个资源的数据,维持树状结构。
      ClusterNode:簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。特别地,Constants.ENTRY_NODE 节点用于统计全局的入口资源数据。
      EntranceNode:入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。Constants.ROOT 节点也是入口节点。

    职责链

    • 从设计模式上来看,典型的的责任链模式。外部请求进来后,要经过责任链上各个节点的处理,而Sentinel的限流、熔断就是通过责任链上的这些节点实现的。
    • NodeSelectorSlot负责构建树形的调用链条。
    • ClusterBuilderSlot负责聚簇的统计。
    • StatisticSlot负责统计。
    # Sentinel default ProcessorSlots
    com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
    com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
    com.alibaba.csp.sentinel.slots.logger.LogSlot
    com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
    com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
    com.alibaba.csp.sentinel.slots.system.SystemSlot
    com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
    com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
    
    • 目前Sentinel内置的slot对象。

    源码分析

    基础用法

    public static void main(String[] args) {
        initFlowRules();
        while (true) {
            Entry entry = null;
            try {
                entry = SphU.entry("HelloWorld");
                /*您的业务逻辑 - 开始*/
                System.out.println("hello world");
                /*您的业务逻辑 - 结束*/
        } catch (BlockException e1) {
                /*流控逻辑处理 - 开始*/
            System.out.println("block!");
                /*流控逻辑处理 - 结束*/
        } finally {
           if (entry != null) {
               entry.exit();
           }
        }
        }
    }
    
    • Sentinel的一般的应用方式如上所示,通过SphU.entry()来判断是否拦截。

    整体流程

    public class SphU {
    
        public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
            // 通过执行Env.sph#entryWithType方法来判断是否通过
            return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
        }
    }
    
    public class Env {
        // 通过Sph继续执行
        public static final Sph sph = new CtSph();
    
        static {
            InitExecutor.doInit();
        }
    }
    
    public class CtSph implements Sph {
    
        public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                                   Object[] args) throws BlockException {
            // 根据资源名创建资源的对象resource
            StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
            return entryWithPriority(resource, count, prioritized, args);
        }
    
        private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
            // 1、Context是线程维度的,通过ThreadLocal进行保存
            Context context = ContextUtil.getContext();
            if (context instanceof NullContext) {
                return new CtEntry(resourceWrapper, null, context);
            }
    
            if (context == null) {
                // 2、创建名为sentinel_default_context的Context对象
                // 内部创建EntranceNode并保存到contextNameNodeMap
                // contextNameNodeMap是静态全局变量
                // EntranceNode添加到全局的ROOT节点
                context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
            }
    
            if (!Constants.ON) {
                return new CtEntry(resourceWrapper, null, context);
            }
            // 3、构建职责链,职责链的前两个核心节点为DefaultProcessorSlotChain和 NodeSelectorSlot
            // DefaultProcessorSlotChain的作用为职责链的头部维护职责链的关系
            // NodeSelectorSlot在初始化过程中创建DefaultNode用以统计信息
            // NodeSelectorSlot的DefaultNode添加到context的EntranceNode节点后
            ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
            if (chain == null) {
                return new CtEntry(resourceWrapper, null, context);
            }
            // 4、创建CtEntry对象
            Entry e = new CtEntry(resourceWrapper, chain, context);
            try {
                // 5、执行职责链的整个流程操作
                chain.entry(context, resourceWrapper, null, count, prioritized, args);
            } catch (BlockException e1) {
                e.exit(count, args);
                throw e1;
            } catch (Throwable e1) {
            }
            return e;
        }
    
        private final static class InternalContextUtil extends ContextUtil {
            static Context internalEnter(String name) {
                return trueEnter(name, "");
            }
    
            static Context internalEnter(String name, String origin) {
                return trueEnter(name, origin);
            }
        }
    }
    
    • 1、从线程的ThreadLocal获取Context对象,为空通过InternalContextUtil.internalEnter创建Context对象。
    • 2、通过lookProcessChain构建Sentinel的职责链。
    • 3、使用Context和lookProcessChain创建CtEntry对象。
    • 4、通过chain.entry触发整体的职责链执行。

    构建Context

    public class ContextUtil {
        // 通过ThreadLocal保存Context对象
        private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
        // 通过Context的名称作为key保存EntranceNode
        private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
    
        private static final ReentrantLock LOCK = new ReentrantLock();
        private static final Context NULL_CONTEXT = new NullContext();
    
        static {
            initDefaultContext();
        }
    
        private static void initDefaultContext() {
            String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
            // 创建EntranceNode
            EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
            // 加入到全局的ROOT节点
            Constants.ROOT.addChild(node);
            // 以Context名称作为key保存EntranceNode
            contextNameNodeMap.put(defaultContextName, node);
        }
    
        public static Context getContext() {
            return contextHolder.get();
        }
    
        protected static Context trueEnter(String name, String origin) {
            Context context = contextHolder.get();
            if (context == null) {
                Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
                DefaultNode node = localCacheNameMap.get(name);
                if (node == null) {
                    if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                        setNullContext();
                        return NULL_CONTEXT;
                    } else {
                        LOCK.lock();
                        try {
                            node = contextNameNodeMap.get(name);
                            if (node == null) {
                                if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                    setNullContext();
                                    return NULL_CONTEXT;
                                } else {
                                    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                    // 保存EntranceNode到ROOT节点当中
                                    Constants.ROOT.addChild(node);
    
                                    Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                    newMap.putAll(contextNameNodeMap);
                                    newMap.put(name, node);
                                    contextNameNodeMap = newMap;
                                }
                            }
                        } finally {
                            LOCK.unlock();
                        }
                    }
                }
                context = new Context(node, name);
                context.setOrigin(origin);
                contextHolder.set(context);
            }
    
            return context;
        }
    }
    
    • contextHolder通过ThreadLocal保存Context对象。
    • contextNameNodeMap通过Context的名称作为key保存EntranceNode
    • EntranceNode以资源对象StringResourceWrapper作为参数进行构建。
    • EntranceNode添加到全局的Constants.ROOT的节点下。

    构建职责链

    public class CtSph implements Sph {
        // 以全局静态变量chainMap保存资源对象作为key的职责链
        private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
            = new HashMap<ResourceWrapper, ProcessorSlotChain>();
    
        ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
            ProcessorSlotChain chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                synchronized (LOCK) {
                    chain = chainMap.get(resourceWrapper);
                    if (chain == null) {
                        if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                            return null;
                        }
                        // 构建职责链
                        chain = SlotChainProvider.newSlotChain();
                        Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                            chainMap.size() + 1);
                        newMap.putAll(chainMap);
                        // 以资源对象作为key保存职责链对象chain
                        newMap.put(resourceWrapper, chain);
                        chainMap = newMap;
                    }
                }
            }
            return chain;
        }
    }
    
    • chainMap作为全局静态变量以资源对象作为key保存职责链。
    • CtSph的chainMap以资源对象作为key保存职责链
    # com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
    com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
    
    
    public final class SlotChainProvider {
        private static volatile SlotChainBuilder slotChainBuilder = null;
    
        public static ProcessorSlotChain newSlotChain() {
            if (slotChainBuilder != null) {
                return slotChainBuilder.build();
            }
    
            // 通过SPI机制加载
            slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
    
            if (slotChainBuilder == null) {
                slotChainBuilder = new DefaultSlotChainBuilder();
            } 
            // 创建职责链对象
            return slotChainBuilder.build();
        }
    }
    
    @Spi(isDefault = true)
    public class DefaultSlotChainBuilder implements SlotChainBuilder {
    
        @Override
        public ProcessorSlotChain build() {
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    
            List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
            for (ProcessorSlot slot : sortedSlotList) {
                if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                    RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                    continue;
                }
    
                chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
            }
    
            return chain;
        }
    }
    
    com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
    com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
    com.alibaba.csp.sentinel.slots.logger.LogSlot
    com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
    com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
    com.alibaba.csp.sentinel.slots.system.SystemSlot
    com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
    com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
    
    • 通过SPI机制加载SlotChainBuilder对象DefaultSlotChainBuilder。
    • 通过DefaultSlotChainBuilder的build来创建职责链对象。
    • 职责链的头部对象为DefaultProcessorSlotChain,然后通过SPI机制加载所有的ProcessorSlot并通过尾加法构建职责链。
    • 目前内置的ProcessorSlot对象如上图所示,职责链的前三个元素为DefaultProcessorSlotChain、NodeSelectorSlot、ClusterBuilderSlot

    执行职责链

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
                throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
    
        };
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
        }
    }
    
    
    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
        private AbstractLinkedProcessorSlot<?> next = null;
    
        @Override
        public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            if (next != null) {
                next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
            }
        }
    
        @SuppressWarnings("unchecked")
        void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
            throws Throwable {
            T t = (T)o;
            entry(context, resourceWrapper, t, count, prioritized, args);
        }
    }
    
    • DefaultProcessorSlotChain作为职责链的第一个元素,通过调用slot的entry+transformEntry+ fireEntry的循环触发完成职责链的调用。
    @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
    public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
        // 以上下文的名字作为key保存DefaultNode
        private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
    
            DefaultNode node = map.get(context.getName());
            if (node == null) {
                synchronized (this) {
                    node = map.get(context.getName());
                    if (node == null) {
                        node = new DefaultNode(resourceWrapper, null);
                        HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                        cacheMap.putAll(map);
                        cacheMap.put(context.getName(), node);
                        map = cacheMap;
                        // Build invocation tree
                        ((DefaultNode) context.getLastNode()).addChild(node);
                    }
    
                }
            }
    
            context.setCurNode(node);
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    }
    
    • NodeSelectorSlot以资源对象作为参数创建DefaultNode对象。
    • NodeSelectorSlot的map以Context的名称作为key保存DefaultNode
    • 通过context.getLastNode()).addChild(node)添加到EntranceNode尾部。
    @Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
    public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
        // 以资源对象作为key保存ClusterNode的全局静态变量
        private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
        private static final Object lock = new Object();
        private volatile ClusterNode clusterNode = null;
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
    
            if (clusterNode == null) {
                synchronized (lock) {
                    if (clusterNode == null) {
                        // Create the cluster node.
                        clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                        HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                        newMap.putAll(clusterNodeMap);
                        // 以资源对象作为key保存集群节点
                        newMap.put(node.getId(), clusterNode);
    
                        clusterNodeMap = newMap;
                    }
                }
            }
            node.setClusterNode(clusterNode);
    
            if (!"".equals(context.getOrigin())) {
                Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
                context.getCurEntry().setOriginNode(originNode);
            }
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    }
    
    • 根据资源对象创建ClusterNode,并以资源对象作为key保存ClusterNode。
    • ClusterBuilderSlot的clusterNodeMap以资源对象作为key保存ClusterNode
     * ContextUtil.enter("entrance1", "appA");
     * Entry nodeA = SphU.entry("nodeA");
     * if (nodeA != null) {
     *     nodeA.exit();
     * }
     * ContextUtil.exit();
     *
     * Above code will generate the following invocation structure in memory:
     *
     *
     *              machine-root
     *                  /
     *                 /
     *           EntranceNode1
     *               /
     *              /
     *        DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
     *
     *-------------------------------------------------------------------------------
     *
     *
     *    ContextUtil.enter("entrance1", "appA");
     *    Entry nodeA = SphU.entry("nodeA");
     *    if (nodeA != null) {
     *        nodeA.exit();
     *    }
     *    ContextUtil.exit();
     *
     *    ContextUtil.enter("entrance2", "appA");
     *    nodeA = SphU.entry("nodeA");
     *    if (nodeA != null) {
     *        nodeA.exit();
     *    }
     *    ContextUtil.exit();
     * 
     *
     * Above code will generate the following invocation structure in memory:
     *
     *
     *                  machine-root
     *                  /         \
     *                 /           \
     *         EntranceNode1   EntranceNode2
     *               /               \
     *              /                 \
     *      DefaultNode(nodeA)   DefaultNode(nodeA)
     *             |                    |
     *             +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
     *
    
    • 在构建Context对象的过程中建立root和EntranceNode的关系。
    • 在构建NodeSelectorSlot对象的过程中建立EntranceNode和DefaultNode的关系。
    • 在构建ClusterBuilderSlot对象的过程中建立DefaultNode和ClusterNode的关系。
    • EntranceNode以Context的名称作为key进行保存
    • DefaultNode以Context的名称作为key进行保存
    • ClusterNode以资源对象的名称作为key进行保存
    • CtSph的chainMap以资源对象作为key保存职责链

    总结

    • 三大组件Context、Entry、Node,是Sentinel的核心组件,各类信息及资源调用情况都由这三大类持有;
    • 采用责任链模式完成Sentinel的信息统计、熔断、限流等操作;
    • 责任链中NodeSelectSlot负责选择当前资源对应的Node,同时构建node调用树;
    • 责任链中ClusterBuilderSlot负责构建当前Node对应的ClusterNode,用于聚合同一资源对应不同Context的Node;
    • 责任链中的StatisticSlot用于统计当前资源的调用情况,更新Node与其对用的ClusterNode的各种统计数据;
    • 责任链中的FlowSlot根据当前Node对应的ClusterNode(默认)的统计信息进行限流;
    • 资源调用统计数据(例如PassQps)使用滑动时间窗口进行统计;
    • 所有工作执行完毕后,执行退出流程,补充一些统计数据,清理Context。

    相关文章

      网友评论

        本文标题:Sentinel流程介绍

        本文链接:https://www.haomeiwen.com/subject/vteieltx.html