美文网首页我的微服务
分布式限流Sentinel

分布式限流Sentinel

作者: 五十米_深蓝 | 来源:发表于2020-05-14 22:46 被阅读0次

    为什么要限流

    众所周知,互联网电商的各类活动是越来越多,例如削减男同胞钱包厚度的双十一、618、双十二、各类秒杀活动等,几乎所有的互联网电商企业都会参与其中,冲击GMV,会电商平台带来巨大的流量与可观的利润。

    作为互联网电商中的一员,我自己所属的公司虽然远比不上淘宝、京东等,但作为社交电商领域的领头羊,我们在上述对于电商企业及其特殊的日子,流量也是不容小觑的。

    嗯,毕竟我们的注册用户数已经超过了6000W了,
    供应商已经超过1.5W家了,去年双十一单天的GMV也突破了2亿RMB,
    嘿嘿,强行给自己公司打Call
    

    好了,让我们进入这期的主题。例如在双十一、或者周年庆等这种特殊的日子,当12点刚到那一刻,巨大的用户流量涌入你们的系统,访问量突然剧增时,我们是如何保证系统的可用性、稳定性。我们的解决方案主要是通过Sentinel的限流、降级、熔断(增加服务器数量就不说了)以及消息中间件的削峰(我会专门写一期关于消息中间件的文章,到时候大家可以看看)。没错,本期的主角出现了,他就是Sentinel,阿里开源的面向分布式服务框架的轻量级流量控制框架。官网如下:https://github.com/alibaba/Sentinel

    为什么选择Sentinel

    以下是另一个开源的流量控制框架hystrix与Sentinel的对比

    Sentinel与hystrix对比
    由上图显而易见,Sentinel相比于hystrix有更加强大的功能,它支持hystrix不具有的系统负载保护与限流以及强大的监控API等。更加适配的Dubbo(我们公司用的RCP就是它)的也是Sentinel;更加重要的一点就是我们技术部的管理层都是阿里出身,技术选型也就都是阿里那一套,毕竟国际大厂,品质还是有保证的,哈哈。

    Sentinel名词解释

    资源

    分布式系统中,限流的资源可以是一个http接口,也可使是某个分布式应用中的API;一般我们针对C端的http接口进行限流,针对API进行熔断降级。

    限流

    限制请求的数量,限制某段时间内的请求总量对于超出的总量的请求,可以直接拒绝,也可以在请求的时候对请求分组,允许特殊请求进来,剩下的拒绝,也可以放入消息队列,削峰填谷。
    限流的实现方式:

    • 计数器(滑动窗口):维护一个counter,每个时间段清零,对时间段内的请求进行计数,计数前判断counter是否达到阙值,如果没有就加一,达到则拒绝
    • 漏斗算法:一个固定容量的桶,当桶为空的时候,不会漏出水滴,流入桶的水的速率是任意的,漏出水的速率是固定的,如果流入桶的水超出桶的容量,进行拒绝
      一般的实现方法是队列,队列模拟漏斗,空的时候不再出队,满的时候拒绝
    • 令牌桶算法:和漏斗算法很类似,不过除了一个队列以外,还加入了一个中间人,它会以一定的速率发放令牌(token)到桶内,队列中的的等待着只有拿到token才能通过漏斗限制了传输速率,而令牌桶在限制的同时,还允许突然的大流量,即:在大流量到来的时候,有足够空间的情况下(足够的队列和桶内有足够的令牌),就允许进入
    降级

    服务降级是从整个系统的负荷情况出发和考虑的,对某些负荷会比较高的情况,为了预防某些功能(业务场景)出现负荷过载或者响应慢的情况,在其内部暂时舍弃对一些非核心的接口和数据的请求,而直接返回一个提前准备好的fallback(退路)错误处理信息。这样,虽然提供的是一个有损的服务,但却保证了整个系统的稳定性和可用性。例如:当双11活动时,把无关交易的服务统统降级,如查看历史订单、工单等等。

    熔断

    在微服务架构中,微服务是完成一个单一的业务功能,这样做的好处是可以做到解耦,每个微服务可以独立演进。但是,一个应用可能会有多个微服务组成,微服务之间的数据交互通过远程过程调用完成。这就带来一个问题,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务。如果调用链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的“雪崩效应”。
    熔断机制是应对雪崩效应的一种微服务链路保护机制。服务熔断的作用类似于我们家用的保险丝,当某服务出现不可用或响应超时的情况时,为了防止整个系统出现雪崩,暂时停止对该服务的调用。熔段解决如下几个问题:

    • 当所依赖的对象不稳定时,能够起到快速失败的目的;
    • 快速失败后,能够根据一定的算法动态试探所依赖对象是否恢复

    Sentinel源码解析

    本源码解析以限流为例,降级具体实现可自行参考源码Sentinel采用滑动窗口算法来实现限流的。限流的直接表现是在执行 Entry nodeA = SphU.entry(资源名字) 的时候抛出 FlowException 异常。FlowException 是BlockException 的子类,您可以捕捉 BlockException 来自定义被限流之后的处理逻辑。

    public static void main(String[] args) throws Exception {
            //初始化一个规则       
            initFlowRule();
            // 触发内部初始化
            Entry entry = null;
            try {
                entry = SphU.entry(KEY);
            } catch  (BlockException e){
                 //如果被限流了,那么会抛出这个异常
                 e.printStackTrace();
            } finally {
                if (entry != null) {
                    entry.exit();
                }
            } 
        }
    

    由上可知,会先初始化一个限流规则,initFlowRule方法中将创建一个限流规则FlowRule对象,主要限流参数如下

    public class FlowRule extends AbstractRule {
       /**
         * 限流阈值类型,QPS 或线程数,默认QPS
       */
        private int grade = RuleConstant.FLOW_GRADE_QPS;
    
        /**
         *  限流阈值
         */
        private double count;
    
        /**
         * 根据调用关系选择策略
         */
        private int strategy = RuleConstant.STRATEGY_DIRECT;
    
        /**
         * 资源名,即限流规则的作用对象
         * 1-预热/冷启动
         * 2-速率限制
         * 3-预热/冷启动+速率限制
         */
        private String refResource;
    
        /**
         * 限流控制行为,默认0-直接拒绝
         */
        private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    }
    

    并设置其相应的限流规则属性,最后通过FlowRuleManager.loadRules(rules)加载限流规则。

    public class FlowRuleManager {
       /**
         * Load {@link FlowRule}s, former rules will be replaced.
         * @param rules new rules to load.
       */
        public static void loadRules(List<FlowRule> rules) {
              currentProperty.updateValue(rules);
        }
    }
    
    public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
      @Override
        public boolean updateValue(T newValue) {
            if (isEqual(value, newValue)) {
                return false;
            }
            RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
            value = newValue;
            for (PropertyListener<T> listener : listeners) {
                //监听修改限流规则
                listener.configUpdate(newValue);
            }
            return true;
        }
    }
    

    限流规则初始化之后,通过entry= SphU.entry(resource)触发内部初始化。
    从 SphU.entry() 方法往下执行会进入到 Sph.entry() ,Sph的默认实现类是 CtSph,而最终会进入CtSph 的entry 方法

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
       //封装一个资源对象
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
    

    通过我们给定的资源去封装了一个 StringResourceWrapper ,然后传入自己的重载方法,继而调用 entryWithPriority(resourceWrapper, count, false, args):

    public class CtSph implements Sph {
      //......
      private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
            Context context = ContextUtil.getContext();
            if (context instanceof NullContext) {
                return new CtEntry(resourceWrapper, null, context);
            }
            if (context == null) {
                // 使用默认上下文
                context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
            }
            // 全局开关关闭,没有规则检查,返回CtEntry对象
            if (!Constants.ON) {
                return new CtEntry(resourceWrapper, null, context);
            }
            // 获取该资源对应的 chain
            ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
            // 获取的chain对象为空,返回CtEntry对象
            if (chain == null) {
                return new CtEntry(resourceWrapper, null, context);
            }
            Entry e = new CtEntry(resourceWrapper, chain, context);
            try {
                //执行chain的 entry方法
                chain.entry(context, resourceWrapper, null, count, prioritized, args);
            } catch (BlockException e1) {
                e.exit(count, args);
                throw e1;
            } catch (Throwable e1) {
                // This should not happen, unless there are errors existing in Sentinel internal.
                RecordLog.info("Sentinel unexpected exception", e1);
            }
            return e;
        }
    }
    

    由上述方法可知,主要是为了获取该资源对应的资源处理链,让我们来看下slotChain是如何获取的

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
            //这里与dubbo(双重检查锁)中如出一辙,采用本地Map缓存机制
            ProcessorSlotChain chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                synchronized (LOCK) {
                    chain = chainMap.get(resourceWrapper);
                    if (chain == null) {
                        // SlotChain最大阈值:6000
                        if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                            return null;
                        }
                        // 构造SlotChain对象
                        chain = SlotChainProvider.newSlotChain();
                        //  资源 -->  处理链
                        Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                            chainMap.size() + 1);
                        // 放入本地map缓存
                        newMap.putAll(chainMap);
                        newMap.put(resourceWrapper, chain);
                        chainMap = newMap;
                    }
                }
            }
            return chain;
        }
    

    当Map缓存中不存在ProcessorSlotChain实例,则具体通过 SlotChainProvider 去构造处理链

    public final class SlotChainProvider {
        //  SlotChain的构造器
        private static volatile SlotChainBuilder slotChainBuilder = null;
        /**
         * 构造SlotChain对象
         */
        public static ProcessorSlotChain newSlotChain() {
            // 若slotChainBuilder存在,直接调用构造方法
            if (slotChainBuilder != null) {
                return slotChainBuilder.build();
            }
            // 通过SpiLoader加载SlotChainBuilder
            slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
            if (slotChainBuilder == null) {
                // Should not go through here.
                RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
                slotChainBuilder = new DefaultSlotChainBuilder();
            } else {
                RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
                    + slotChainBuilder.getClass().getCanonicalName());
            }
            return slotChainBuilder.build();
        }
    }
    

    继续让我们来看下slotChainBuilder的build方法中做了些什么

    public class DefaultSlotChainBuilder implements SlotChainBuilder {
        @Override
        public ProcessorSlotChain build() {
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
            // 获取ProcessorSlot实例集合
            List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
            for (ProcessorSlot slot : sortedSlotList) {
                // 过滤非AbstractLinkedProcessorSlot类型的Slot
                if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                    RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                    continue;
                }
                // 加入链路中
                chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
            }
            return chain;
        }
    }
    

    我们可以看出上述底层源码是一个标准的责任链设计模式,通过查看ProcessorSlot的具体实现类,我们可以知道该责任链中的具体节点如图所示


    责任链中的节点

    执行对应的这些节点,具有有不同的职责,例如:

    • NodeSelectorSlot :收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
    • ClusterBuilderSlot :用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
    • StatisticSlot :用于记录、统计不同纬度的 runtime 指标监控信息;
    • SystemSlot :通过系统的状态,例如 load1 等,来控制总的入口流量;
    • AuthoritySlot :根据配置的黑白名单和调用来源信息,来做黑白名单控制;
    • FlowSlot :用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
    • DegradeSlot :通过统计信息以及预设的规则,来做熔断降级;
      slot执行链路可参考如下
      slot执行框架
      上面获得的ProcessorSlotChain的实例是DefaultProcessorSlotChain,那么执行chain.entry方法,就会执行DefaultProcessorSlotChain.first的entry方法,而DefaultProcessorSlotChain.first的entry方法是这样的:
    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
                throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
    
        };
    }
    
    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
        private AbstractLinkedProcessorSlot<?> next = null;
        @Override
        public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            if (next != null) {
                next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
            }
        }
        @SuppressWarnings("unchecked")
        void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
            throws Throwable {
            T t = (T)o;
            entry(context, resourceWrapper, t, count, prioritized, args);
        }
    

    下图所示是各个slot对应的entry方法的具体实现


    entry方法具体实现

    我们以StatisticSlot为例,来看看这些具体实现类内部的逻辑是怎样的。

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            try {
                // 传播到下一个Slot.
                fireEntry(context, resourceWrapper, node, count, prioritized, args);
    
                // 执行到这里表示通过检查,不被限流
                node.increaseThreadNum();
                // 请求通过了sentinel的流控等规则,记录当次请求
                node.addPassRequest(count);
    
                if (context.getCurEntry().getOriginNode() != null) {
                    // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                    context.getCurEntry().getOriginNode().addPassRequest(count);
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                    Constants.ENTRY_NODE.addPassRequest(count);
                }
    
                // Handle pass event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (PriorityWaitException ex) {
                // 增加线程统计
                node.increaseThreadNum();
                if (context.getCurEntry().getOriginNode() != null) {
                    // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                }
                // Handle pass event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (BlockException e) {
    
                context.getCurEntry().setBlockError(e);
    
                // 增加QPS统计
                node.increaseBlockQps(count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseBlockQps(count);
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseBlockQps(count);
                }
    
                // Handle block event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onBlocked(e, context, resourceWrapper, node, count, args);
                }
    
                throw e;
            } catch (Throwable e) {
                // Unexpected internal error, set error to current entry.
                context.getCurEntry().setError(e);
    
                throw e;
            }
        }
    

    请求通过了sentinel的流控等规则,再通过node.addPassRequest() 将当次请求记录下来

    public void addPassRequest(int count) {
            super.addPassRequest(count);
            this.clusterNode.addPassRequest(count);
        }
    

    addPassRequest方法如下

    public void addPassRequest(int count) {
            rollingCounterInSecond.addPass(count);
            rollingCounterInMinute.addPass(count);
        }
    

    addPass方法如下

    public void addPass(int count) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addPass(count);
        }
    

    WindowWrap主要属性如下

    public class WindowWrap<T> {
        /**
         * 时间窗口的长度
         */
        private final long windowLengthInMs;
        /**
         * 时间窗口的开始时间,单位是毫秒
         */
        private long windowStart;
        /**
         * S时间窗口的内容,在 WindowWrap 中是用泛型表示这个值的,但实际上就是 MetricBucket 类
         */
        private T value;
    }
    

    我们再看看获取当前窗口的方法 data.currentWindow()

    public WindowWrap<T> currentWindow(long timeMillis) {
            if (timeMillis < 0) {
                return null;
            }
            int idx = calculateTimeIdx(timeMillis);
            long windowStart = calculateWindowStart(timeMillis);
            while (true) {
                WindowWrap<T> old = array.get(idx);
                if (old == null) {
                    WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                    if (array.compareAndSet(idx, null, window)) {
                        // Successfully updated, return the created bucket.
                        return window;
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                        Thread.yield();
                    }
                } else if (windowStart == old.windowStart()) {
                    return old;
                } else if (windowStart > old.windowStart()) {
                    if (updateLock.tryLock()) {
                        try {
                            // Successfully get the update lock, now we reset the bucket.
                            return resetWindowTo(old, windowStart);
                        } finally {
                            updateLock.unlock();
                        }
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                        Thread.yield();
                    }
                } else if (windowStart < old.windowStart()) {
                    // Should not go through here, as the provided time is already behind.
                    return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                }
            }
        }
    

    我们再回到

    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
    

    获取到窗口以后通过 wrap.value().addPass(count)增加统计的 QPS。而这里的 wrap.value() 得到的是之前提到的 MetricBucket ,在 Sentinel 中QPS相关数据的统计结果是维护在这个类的 LongAdder[] 中,最终由这个指标来与我们实现设置好的规则进行匹配,查看是否限流,也就是 StatisticSlot的entry 方法中的。在执行StatisticSlot的entry前都要先进入到FlowSlot的entry方法进行限流过滤:

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            // 限流校验
            checkFlow(resourceWrapper, context, node, count, prioritized);
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    

    让我们进入checkFlow的内部

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                              Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
            if (ruleProvider == null || resource == null) {
                return;
            }
            Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
            if (rules != null) {
                for (FlowRule rule : rules) {
                    if (!canPassCheck(rule, context, node, count, prioritized)) {
                        throw new FlowException(rule.getLimitApp(), rule);
                    }
                }
            }
        }
    

    再此处我们拿到了设置的 FlowRule ,循环匹配资源进行限流过滤。这就是Sentinel 能做到限流的原因。

    Sentinel配置介绍

    我们可以通过Sentinel的客户端查看接入了sentinel的各个系统。可针对系统中的各个资源设置相应的限流规则,如QPS或者线程数;或者设置相应的降级规则,如平均RT,异常比例以及异常数。

    [{
        "resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
        "count": 1000,//RT threshold or exception ratio threshold count
        "grade": 0,  //Degrade strategy (0: average RT, 1: exception ratio)
        "passCount": 0,
        "timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
      },
      {
        "resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
        "count": 0.5,//RT threshold or exception ratio threshold count
        "grade": 1,// Degrade strategy (0: average RT, 1: exception ratio)
        "passCount": 0,
        "timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
      }]
    

    相关文章

      网友评论

        本文标题:分布式限流Sentinel

        本文链接:https://www.haomeiwen.com/subject/hnhlsqtx.html