Sentinel

作者: spilledyear | 来源:发表于2018-11-19 21:09 被阅读46次

    推荐一篇写的不错的文章:限流降级神器-哨兵

    源码地址

    使用

    有关于sentinel的使用方法和工作原理,在官方文档中都有详细的介绍,并且源码中也已经给出了一系列的demo,以下是示例:

    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-core</artifactId>
        <version>1.4.0-SNAPSHOT</version>
    </dependency>
    
    public class AuthorityDemo {
    
        private static final String RESOURCE_NAME = "testABC";
    
        public static void main(String[] args) {
            System.out.println("========Testing for black list========");
            initBlackRules();
            testFor(RESOURCE_NAME, "appA");
            testFor(RESOURCE_NAME, "appB");
            testFor(RESOURCE_NAME, "appC");
            testFor(RESOURCE_NAME, "appE");
    
            System.out.println("========Testing for white list========");
            initWhiteRules();
            testFor(RESOURCE_NAME, "appA");
            testFor(RESOURCE_NAME, "appB");
            testFor(RESOURCE_NAME, "appC");
            testFor(RESOURCE_NAME, "appE");
        }
    
        private static void testFor(/*@NonNull*/ String resource, /*@NonNull*/ String origin) {
            ContextUtil.enter(resource, origin);
            Entry entry = null;
            try {
                entry = SphU.entry(resource);
                System.out.println(String.format("Passed for resource %s, origin is %s", resource, origin));
            } catch (BlockException ex) {
                System.err.println(String.format("Blocked for resource %s, origin is %s", resource, origin));
            } finally {
                if (entry != null) {
                    entry.exit();
                }
                ContextUtil.exit();
            }
        }
    
        private static void initWhiteRules() {
            AuthorityRule rule = new AuthorityRule();
            rule.setResource(RESOURCE_NAME);
            rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
            rule.setLimitApp("appA,appE");
            AuthorityRuleManager.loadRules(Collections.singletonList(rule));
        }
    
        private static void initBlackRules() {
            AuthorityRule rule = new AuthorityRule();
            rule.setResource(RESOURCE_NAME);
            rule.setStrategy(RuleConstant.AUTHORITY_BLACK);
            rule.setLimitApp("appA,appB");
            AuthorityRuleManager.loadRules(Collections.singletonList(rule));
        }
    }
    

    原理

    SphU.entry(resource);
    
    // SphU.java
    public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }
    
    // Env.java
    public static final Sph sph = new CtSph();
    
    // CtSph.java
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
    

    入口其实就是在CtSph类的entry方法中。这里引出了一个“资源”的概念,“资源”在sentinel中可以是任何东西:服务,服务里的方法,甚至是一段代码,比如上面demo中的RESOURCE_NAME就是一个资源。当然这只是我们字面上理解的“资源”,sentinel对资源做了抽象,即:ResourceWrapper。比如这里的RESOURCE_NAME是一个字符串,所以对应StringResourceWrapper,StringResourceWrapper 是ResourceWrapper的子类。

    entry的具体实现如下,前面是一些校验项目,重点关注lookProcessChain方法,其实就是ProcessorSlotChain的生成过程

    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }
    
        if (context == null) {
            // Using default context.
            context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
        }
    
        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
    
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
        /*
            * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
            * so no rule checking will be done.
            */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
    
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
    

    ProcessorSlotChain生成

    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        // 双重校验
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
    
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
    

    该方法主要就是根据资源获取到对应的ProcessorSlotChain,这里通过一个HashMap将资源和ProcessorSlotChain的关系缓存起来了,如果根据资源没有在缓存中找到ProcessorSlotChain,则创建一个新的ProcessorSlotChain。而ProcessorSlotChain则是具体限流、降级等操作的入口。在sentinel中定义了一系列的功能插槽(Solt),目前有7个:NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot。每个插槽对应不同的功能,比如FlowSlot负责流量控制、DegradeSlot用来做熔断降级,具体的可以查看官方文档,每个资源可以对应一个或多个Solt。ProcessorSlotChain主要就是针对资源调用具体插槽的逻辑,将一个或多个插槽泡拼装成一条链,在执行完当期插槽逻辑的之后,出发下一个插槽的逻辑,直到整条链调用完成。

    SlotChainProvider.newSlotChain()的具体逻辑如下:

    private static volatile SlotChainBuilder builder = null;
    
    private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);
    
    public static ProcessorSlotChain newSlotChain() {
        if (builder != null) {
            return builder.build();
        }
    
        resolveSlotChainBuilder();
    
        if (builder == null) {
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            builder = new DefaultSlotChainBuilder();
        }
        return builder.build();
    }
    

    这里引入了SPI的概念,在sentinel-core模块的resource/META-INF/services目录下,有一个名为com.alibaba.csp.sentinel.slotchain.SlotChainBuilder的文件,文件内容如下:

    # Default slot chain builder
    com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
    

    即,这里引用的是DefaultSlotChainBuilder,同时这也说明我们可以自定义SlotChainBuilder实现。

    public class DefaultSlotChainBuilder implements SlotChainBuilder {
    
        @Override
        public ProcessorSlotChain build() {
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
            chain.addLast(new NodeSelectorSlot());
            chain.addLast(new ClusterBuilderSlot());
            chain.addLast(new LogSlot());
            chain.addLast(new StatisticSlot());
            chain.addLast(new SystemSlot());
            chain.addLast(new AuthoritySlot());
            chain.addLast(new FlowSlot());
            chain.addLast(new DegradeSlot());
    
            return chain;
        }
    
    }
    

    DefaultProcessorSlotChain 中的部分代码

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
                throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, args);
            }
    
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
    
        };
        AbstractLinkedProcessorSlot<?> end = first;
    
        @Override
        public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            protocolProcessor.setNext(first.getNext());
            first.setNext(protocolProcessor);
            if (end == first) {
                end = protocolProcessor;
            }
        }
    
        @Override
        public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            end.setNext(protocolProcessor);
            end = protocolProcessor;
        }
    }
    

    主要就是将不同的插槽拼装成一条链路,addFirst表示加在链表的头部,主要通过改变first的next指向来实现;addLast表示加在链表的尾部,主要通过改变end的next指向来实现,如果不是很理解,在纸上比划比划就很清楚了。

    ProcessorSlotChain执行

    以上是有关于ProcessorSlotChain的生成逻辑,接下来看看ProcessorSlotChain的执行逻辑,继续回到Ctsph中的entry方法,在上面已经粘贴过一次,这里省略部分非关键代码:

    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        // 如果chain为空,说明资源数已经超过sentinel设置的最带值了,默认是6000
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
    
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            // ProcessorSlotChain执行入口
            chain.entry(context, resourceWrapper, null, count, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
    

    不难发现,入口在chain.entry(context, resourceWrapper, null, count, args),从上面的ProcessorSlotChain生成逻辑可以发现,生成的是DefaultProcessorSlotChain,所以主要关注DefaultProcessorSlotChain的entry方法

    // DefaultProcessorSlotChain.java
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, args);
    }
    
    // AbstractLinkedProcessorSlot.java
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, args);
    }
    
    //DefaultProcessorSlotChain.java
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
        throws Throwable {
        super.fireEntry(context, resourceWrapper, t, count, args);
    }
    
    // AbstractLinkedProcessorSlot.java
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
        throws Throwable {
        // 这里的next其实就是指具体的插槽实现了
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, args);
        }
    }
    

    最关键的部分其实就在fireEntry方法中,这里的next其实就是指具体的插槽实现,比如这里以FlowSlot为例:

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {
        // FlowSlot具体的插槽逻辑
        checkFlow(resourceWrapper, context, node, count);
        
        // 通过调用AbstractLinkedProcessorSlot的fireEntry方法,用来触发下一个插槽逻辑的调用
        fireEntry(context, resourceWrapper, node, count, args);
    }
    

    其实这就是插槽链的调用,比如SpringAOP中的Intercepterl链、Mybatis中的plugin链路,虽然具体的实现方式不同,但是目的都是一样的:执行完整条链上的逻辑。

    上面的调用都是理想的情况,即:所有的请求都通过,没有被限制的情况。如果请求被拒绝,该怎么处理?这里还是以FlowSlot为例:

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {
        checkFlow(resourceWrapper, context, node, count);
    
        fireEntry(context, resourceWrapper, node, count, args);
    }
    
    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
        // Flow rule map cannot be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRules();
    
        List<FlowRule> rules = flowRules.get(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count)) {
                    throw new FlowException(rule.getLimitApp());
                }
            }
        }
    }
    

    可以看到,如果请求被拒绝,即:被限流了,则抛出BlockException异常,在外层如果捕获到BlockException异常,则在里面处理对应的逻辑。

    sentinel-dashboard

    sentinel-dashboard是sentinel的轻量级控制台,该控制台主要提供两个功能:监控、配置。即:针对资源的监控和针对资源的配置,比如:可以配置一些规则。

    sentinel-dashboard是基于spring-boot2,所以直接启动DashboardApplication就可以了,当然也可以以jar包的方式启动,启动之后的界面效果如下:

    没错,什么都没有,因为这时候没有可监控的应用。

    接入到sentinel-dashboard的流程也很简单,新建一个应用, 添加以下依赖

    <!-- sentinel-dashboard -->
    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-transport-simple-http</artifactId>
        <version>1.4.0-SNAPSHOT</version>
    </dependency>
    

    以下是测试代码,其实就是源码中的demo,这里直接搬过来

    package com.hand.sxy.sentinel;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import com.alibaba.csp.sentinel.util.TimeUtil;
    import com.alibaba.csp.sentinel.Entry;
    import com.alibaba.csp.sentinel.SphU;
    import com.alibaba.csp.sentinel.slots.block.BlockException;
    import com.alibaba.csp.sentinel.slots.block.RuleConstant;
    import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
    import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
    
    public class FlowQps {
    
        private static final String KEY = "abc";
    
        private static AtomicInteger pass = new AtomicInteger();
        private static AtomicInteger block = new AtomicInteger();
        private static AtomicInteger total = new AtomicInteger();
    
        private static volatile boolean stop = false;
    
        private static final int threadCount = 32;
    
        private static int seconds = 600000 + 40;
    
        public static void main(String[] args) throws Exception {
            initFlowQpsRule();
    
            tick();
            // first make the system run on a very low condition
            simulateTraffic();
    
            System.out.println("===== begin to do flow control");
            System.out.println("only 20 requests per second can pass");
    
        }
    
        private static void initFlowQpsRule() {
            List<FlowRule> rules = new ArrayList<FlowRule>();
            FlowRule rule1 = new FlowRule();
            rule1.setResource(KEY);
            // set limit qps to 20
            rule1.setCount(20);
            rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
            rule1.setLimitApp("default");
            rules.add(rule1);
            FlowRuleManager.loadRules(rules);
        }
    
        private static void simulateTraffic() {
            for (int i = 0; i < threadCount; i++) {
                Thread t = new Thread(new RunTask());
                t.setName("simulate-traffic-Task");
                t.start();
            }
        }
    
        private static void tick() {
            Thread timer = new Thread(new TimerTask());
            timer.setName("sentinel-timer-task");
            timer.start();
        }
    
        static class TimerTask implements Runnable {
    
            @Override
            public void run() {
                long start = System.currentTimeMillis();
                System.out.println("begin to statistic!!!");
    
                long oldTotal = 0;
                long oldPass = 0;
                long oldBlock = 0;
                while (!stop) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                    }
                    long globalTotal = total.get();
                    long oneSecondTotal = globalTotal - oldTotal;
                    oldTotal = globalTotal;
    
                    long globalPass = pass.get();
                    long oneSecondPass = globalPass - oldPass;
                    oldPass = globalPass;
    
                    long globalBlock = block.get();
                    long oneSecondBlock = globalBlock - oldBlock;
                    oldBlock = globalBlock;
    
                    System.out.println(seconds + " send qps is: " + oneSecondTotal);
                    System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                            + ", pass:" + oneSecondPass
                            + ", block:" + oneSecondBlock);
    
                    if (seconds-- <= 0) {
                        stop = true;
                    }
                }
    
                long cost = System.currentTimeMillis() - start;
                System.out.println("time cost: " + cost + " ms");
                System.out.println("total:" + total.get() + ", pass:" + pass.get()
                        + ", block:" + block.get());
                System.exit(0);
            }
        }
    
        static class RunTask implements Runnable {
            @Override
            public void run() {
                while (!stop) {
                    Entry entry = null;
    
                    try {
                        entry = SphU.entry(KEY);
                        // token acquired, means pass
                        pass.addAndGet(1);
                    } catch (BlockException e1) {
                        block.incrementAndGet();
                    } catch (Exception e2) {
                        // biz exception
                    } finally {
                        total.incrementAndGet();
                        if (entry != null) {
                            entry.exit();
                        }
                    }
    
                    Random random2 = new Random();
                    try {
                        TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
                    } catch (InterruptedException e) {
                        // ignore
                    }
                }
            }
        }
    }
    

    启动的时候,添加以下参数:

    -Djava.net.preferIPv4Stack=true
    -Dcsp.sentinel.dashboard.server=localhost:8080
    -Dcsp.sentinel.api.port=8720
    -Dproject.name=我的APP
    

    启动之后,刷新界面,查看效果

    有关于控制台的一些功能就不过多介绍了,有兴趣可以自己看看。

    Dubbo适配

    看官方文档,除了dubbo适配,文档上还有与其它主流框架适配的介绍。dubbo适配主要是涉及到两个Filtter:SentinelDubboConsumerFilter、SentinelDubboProviderFilter。从名字上也可以看出来,SentinelDubboConsumerFilter主要是限制调用方请求;SentinelDubboProviderFilter主要就是限制提供方提供。有关于这两个应用场景,推荐看看dubbo的官方文档,上面有详细的说明,并且还列举了比较好的例子,例如:

    相关文章

      网友评论

        本文标题:Sentinel

        本文链接:https://www.haomeiwen.com/subject/chiafqtx.html