美文网首页
Sentinel源码-入口类和SlotChain构建过程

Sentinel源码-入口类和SlotChain构建过程

作者: 分布式与微服务 | 来源:发表于2023-03-01 09:54 被阅读0次

    1. 测试用例

    我们以sentinel-demo中的sentinel-annotation-spring-aop为例,分析sentinel的源码。核心代码如下:

    DemoController:

    @RestController
    public class DemoController {
    
        @Autowired
        private TestService service;
    
        @GetMapping("/foo")
        public String apiFoo(@RequestParam(required = false) Long t) throws Exception {
            if (t == null) {
                t = System.currentTimeMillis();
            }
            service.test();
            return service.hello(t);
        }
    
        @GetMapping("/baz/{name}")
        public String apiBaz(@PathVariable("name") String name) {
            return service.helloAnother(name);
        }
    }
    
    

    TestServiceImpl:

    @Service
    public class TestServiceImpl implements TestService {
    
        @Override
        @SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class})
        public void test() {
            System.out.println("Test");
        }
    
        @Override
        @SentinelResource(value = "hello", fallback = "helloFallback")
        public String hello(long s) {
            if (s < 0) {
                throw new IllegalArgumentException("invalid arg");
            }
            return String.format("Hello at %d", s);
        }
    
        @Override
        @SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback",
            exceptionsToIgnore = {IllegalStateException.class})
        public String helloAnother(String name) {
            if (name == null || "bad".equals(name)) {
                throw new IllegalArgumentException("oops");
            }
            if ("foo".equals(name)) {
                throw new IllegalStateException("oops");
            }
            return "Hello, " + name;
        }
    
        public String helloFallback(long s, Throwable ex) {
            // Do some log here.
            ex.printStackTrace();
            return "Oops, error occurred at " + s;
        }
    
        public String defaultFallback() {
            System.out.println("Go to default fallback");
            return "default_fallback";
        }
    }
    
    

    启动类DemoApplication

    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    }
    
    

    在启动这个工程上增加参数:

    -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj
    
    
    如图:

    打开http://localhost:8081/#/dashboard 地址,可以看到应用已经注册到sentinel管理后台:

    1.1 流控测试

    访问 http://localhost:19966/foo?t=188 这个链接,多访问几次,在实时监控页面可以看到:

    然后,我们先简单配置一个流控规则,如下:
    • 其中,资源名为:

    然后我们在快速刷新http://localhost:19966/foo?t=188 接口,会出现限流的情况,返回如下:

    Oops, error occurred at 188
    
    

    实时监控为:


    2. 注解版源码分析

    使用注解@SentinelResource 核心原理就是 利用AOP切入到方法中,我们直接看SentinelResourceAspect类,这是一个切面类:

    @Aspect // 切面
    public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
    
        // 指定切入点为@SentinelResource 注解
        @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
        public void sentinelResourceAnnotationPointcut() {
        }
    
        // 环绕通知
        @Around("sentinelResourceAnnotationPointcut()")
        public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
            Method originMethod = resolveMethod(pjp);
    
            SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
            if (annotation == null) {
                // Should not go through here.
                throw new IllegalStateException("Wrong state for SentinelResource annotation");
            }
            String resourceName = getResourceName(annotation.value(), originMethod);
            EntryType entryType = annotation.entryType();
            int resourceType = annotation.resourceType();
            Entry entry = null;
            try {
                // 要织入的,增强的功能
                entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
                // 调用目标方法
                return pjp.proceed();
            } catch (BlockException ex) {
                return handleBlockException(pjp, annotation, ex);
            } catch (Throwable ex) {
                Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
                // The ignore list will be checked first.
                if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                    throw ex;
                }
                if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                    traceException(ex);
                    return handleFallback(pjp, annotation, ex);
                }
    
                // No fallback function can handle the exception, so throw it out.
                throw ex;
            } finally {
                if (entry != null) {
                    entry.exit(1, pjp.getArgs());
                }
            }
        }
    }
    
    

    核心方法SphU.entry():

    public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
        throws BlockException {
        // 注意 第4个参数值为 1
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
    }
    @Override
    public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
        throws BlockException {
        // count 参数:表示当前请求可以增加多少个计数
        // 注意 第5个参数为false
        return entryWithType(name, resourceType, entryType, count, false, args);
    }
    @Override
    public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                               Object[] args) throws BlockException {
        // 将信息封装为一个资源对象
        StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
        // 返回一个资源操作对象entry
        // prioritized 为true 表示当前访问必须等待"根据其优先级计算出的时间"后才通过
        // prioritized 为 false 则当前请求无需等待
        return entryWithPriority(resource, count, prioritized, args);
    }
    
    

    我们重点看一下CtSph#entryWithPriority

    /**
     * @param resourceWrapper
     * @param count 默认为1
     * @param prioritized 默认为false
     * @param args
     * @return
     * @throws BlockException
     */
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        // 从ThreadLocal中获取Context
        // 一个请求会占用一个线程,一个线程会绑定一个context
        Context context = ContextUtil.getContext();
        // 若context是 NullContext类型,则表示当前系统中的context数量已经超过阈值
        // 即访问的请求的数量已经超出了阈值,此时直接返回一个无需做规则检测的资源操作对象
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }
    
        // 当前线程中没有绑定context,则创建一个context并将其放入到Threadlocal
        if (context == null) {
            // todo Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
    
        // Global switch is close, no rule checking will do.
        // 若全局开关是关闭的,直接返回一个无需做规则检测的资源操作对象
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
    
        // todo 查找SlotChain
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        // 若没有知道chain,则意味着chain数量超出了阈值
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
    
        // 创建一个资源操作对象
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            // todo 对资源进行操作
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
    
    

    2.1 默认Context创建

    当前线程没有绑定Context,则创建一个context并将其放入到Threadlocal。核心方法为 InternalContextUtil.internalEnter

    public static Context enter(String name, String origin) {
        if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
            throw new ContextNameDefineException(
                "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
        }
        return trueEnter(name, origin);
    }
    
    protected static Context trueEnter(String name, String origin) {
        // 尝试从ThreadLocal中获取context
        Context context = contextHolder.get();
        // 若Threadlocal中没有,则尝试从缓存map中获取
        if (context == null) {
            // 缓存map的key为context名称,value为EntranceNode
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            // DCL 双重检测锁,防止并发创建对象
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
    
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                // 创建一个EntranceNode
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                // 将新建的node添加到Root
                                Constants.ROOT.addChild(node);
    
                                // 将新建的node写入到缓存map
                                // 为了防止"迭代稳定性问题"-iterate stable 对于共享集合的写操作
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            // 将context的name与entranceNode 封装成context
            context = new Context(node, name);
            // 初始化context的来源
            context.setOrigin(origin);
            // 将context写入到ThreadLocal
            contextHolder.set(context);
        }
    
        return context;
    }
    
    

    注意:因为 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();是 HashMap结构,所以存在并发安全问题,采用 代码中方式进行添加操作。

    2.2 查找并创建SlotChain

    构建调用链lookProcessChain(resourceWrapper)

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        // 缓存map的key为资源 value为其相关的SlotChain
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        // DCL
        // 若缓存中没有相关的SlotChain 则创建一个并放入到缓存中
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    // 缓存map的size 大于 chain数量的最大阈值,则直接返回null,不在创建新的chain
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
    
                    // todo 创建新的chain
                    chain = SlotChainProvider.newSlotChain();
                    // 防止 迭代稳定性问题
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
    
    

    我们直接看核心方法SlotChainProvider.newSlotChain();

    public static ProcessorSlotChain newSlotChain() {
            // 若builder不为null,则直接使用builder构建一个chain
            // 否则先创建一个builder
            if (slotChainBuilder != null) {
                return slotChainBuilder.build();
            }
    
            // Resolve the slot chain builder SPI.
            // 通过SPI方式创建builder
            slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
    
            // 若通过SPI未能创建builder,则创建一个默认的DefaultSlotChainBuilder
            if (slotChainBuilder == null) {
                // Should not go through here.
                RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
                slotChainBuilder = new DefaultSlotChainBuilder();
            } else {
                RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                    slotChainBuilder.getClass().getCanonicalName());
            }
            // todo 构建一个chain
            return slotChainBuilder.build();
        }
    
        private SlotChainProvider() {}
    }
    
    

    2.2.1 创建slotChainBuilder

    // 通过SPI方式创建builder
    slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
    
    

    通过SPI方法创建slotChainBuilder,去项目中META-INF.service中获取:

    2.2.2 slotChainBuilder.build()

    @Spi(isDefault = true)
    public class DefaultSlotChainBuilder implements SlotChainBuilder {
    
        @Override
        public ProcessorSlotChain build() {
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    
            // 通过SPI方式构建Slot
            List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
            for (ProcessorSlot slot : sortedSlotList) {
                if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                    RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                    continue;
                }
    
                chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
            }
    
            return chain;
        }
    }
    
    

    通过SPI机制,去项目中META-INF.service中获取,在sentinel-core项目中:

    还有一个ParamFlowSlot,在sentinel-extension/sentinel-parameter-flow-control下:

    我们点击 NodeSelectorSlot, 类上面是有 优先级order,数字越小,优先级越高。

    @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
    public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    
    

    优先级常量为:

    public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
    public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
    public static final int ORDER_LOG_SLOT = -8000;
    public static final int ORDER_STATISTIC_SLOT = -7000;
    public static final int ORDER_AUTHORITY_SLOT = -6000;
    public static final int ORDER_SYSTEM_SLOT = -5000;
    public static final int ORDER_FLOW_SLOT = -2000;
    public static final int ORDER_DEGRADE_SLOT = -1000;
    
    

    我们看代码中的变量sortedSlotList,已经按照优先级排序好了:

    我们看一下构建的ProcessorSlotChain,类似一个单链表结构,如下:

    我们看一下相关的类结构:DefaultProcessorSlotChain:

    // 这是一个单向链表,默认包含一个接节点,且有两个指针first 和end同时指向这个节点
    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
                throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
    
        };
        AbstractLinkedProcessorSlot<?> end = first;
    
        @Override
        public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            protocolProcessor.setNext(first.getNext());
            first.setNext(protocolProcessor);
            if (end == first) {
                end = protocolProcessor;
            }
        }
    
        @Override
        public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            end.setNext(protocolProcessor);
            end = protocolProcessor;
        }
    }
    
    

    AbstractLinkedProcessorSlot:

    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
        // 声明一个同类型的变量,则可以指向下一个Slot节点
        private AbstractLinkedProcessorSlot<?> next = null;
    
        @Override
        public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            if (next != null) {
                next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
            }
        }
    
        @SuppressWarnings("unchecked")
        void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
            throws Throwable {
            T t = (T)o;
            entry(context, resourceWrapper, t, count, prioritized, args);
        }
    
        @Override
        public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            if (next != null) {
                next.exit(context, resourceWrapper, count, args);
            }
        }
    
        public AbstractLinkedProcessorSlot<?> getNext() {
            return next;
        }
    
        public void setNext(AbstractLinkedProcessorSlot<?> next) {
            this.next = next;
        }
    
    }
    
    

    构建完成后的SlotChain和工作原理图一样:

    相关文章

      网友评论

          本文标题:Sentinel源码-入口类和SlotChain构建过程

          本文链接:https://www.haomeiwen.com/subject/kpiaartx.html