Sentinel

作者: 剑道_7ffc | 来源:发表于2020-07-02 07:47 被阅读0次

    主要特性

    限流,熔断,降级,监控等功能


    image.png

    基本使用

    QPS(Queries-per-second):每秒的请求次数
    资源:接口和方法

    快速入门

    整体思路是类似于锁,若成功获取资源,则执行下面的逻辑,若没有则抛出阻塞异常。
    https://github.com/alibaba/Sentinel/wiki/%E6%96%B0%E6%89%8B%E6%8C%87%E5%8D%97#%E5%85%AC%E7%BD%91-demo

    控制台

    https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0#2-%E5%90%AF%E5%8A%A8%E6%8E%A7%E5%88%B6%E5%8F%B0

    案例代码

    @Configuration
    public class AopConfiguration {
        @Bean
        public SentinelResourceAspect sentinelResourceAspect(){
            return new SentinelResourceAspect();
        }
    }
    @RestController
    public class SentinelController {
    
        @SentinelResource(value = "sayHello") //针对方法级别的限流
        @GetMapping("/say")
        public String sayHello(){
            System.out.println("hello world");
            return "hello world";
        }
    }
    @SpringBootApplication
    public class SentinelDemoApplication {
    
        public static void main(String[] args) {
            initFlowRules();
            SpringApplication.run(SentinelDemoApplication.class, args);
        }
    
        //初始化规则
        private static void initFlowRules(){
            List<FlowRule> rules=new ArrayList<>(); //限流规则的集合
            FlowRule flowRule=new FlowRule();
            flowRule.setResource("sayHello");//资源(方法名称、接口)
            flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS); //限流的阈值的类型
            flowRule.setCount(10);
            rules.add(flowRule);
            FlowRuleManager.loadRules(rules);
        }
    
    }
    

    运行结果


    image.png

    实现原理

    链路如何添加的

    责任链模式


    image.png
    image.png
    源码分析

    1 com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build
    通过spi来实现责任链的获取

    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    
        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
    
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
    
        return chain;
    }
    

    2 com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#addLast
    通过addLast构建上下级关系,第一个end是一个抽象内部类,只是起一个开头的作用。

    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
    
    image.png

    如何实现限流

    滑动窗口(StatisticSlot)


    image.png

    sampleCount:表示1ms中滑动窗口的总个数
    intervalInMs:表示把1ms分成多少份
    windowLengthInMs:表示1个活动窗口占多少份

    源码分析

    1 com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
        // Request passed, add thread count and pass count.
        node.increaseThreadNum();
        node.addPassRequest(count);//滑动窗口
    
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    }
    

    2 com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow

    public WindowWrap<T> currentWindow(long timeMillis) {
        // 910/200=4
        int idx = calculateTimeIdx(timeMillis);
        // 第五个窗口的开始时间:800
        long windowStart = calculateWindowStart(timeMillis);
    
        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
        }
    

    规则如何判断的

    FlowSlot

    源码分析

    1 com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Sentinel

          本文链接:https://www.haomeiwen.com/subject/pqltqktx.html