美文网首页
Sentinel涉及的限流算法

Sentinel涉及的限流算法

作者: 雪飘千里 | 来源:发表于2023-02-26 21:55 被阅读0次

    Sentinel基础介绍见 微服务流量控制组件Sentinel

    10.1 滑动窗口算法

    滑动窗口算法是将时间周期分为N个小周期(窗口),分别记录每个小周期内访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口。最终只需要统计滑动窗口范围内的所有小时间窗口总的计数即可,入下图所示

    img

    将一分钟拆分为4个小时间窗口,每个小时间窗口最多能够处理25个请求。并且通过虚线框表示滑动窗口的大小(当前窗口的大小是2,也就是在这个窗口内最多能够处理50个请求)。同时滑动窗口会随着时间往前移动,比如前面15s结束之后,窗口会滑动到15s~45s这个范围,然后在新的窗口中重新统计数据。

    由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。此算法可以很好的解决固定窗口算法的临界问题。

    Sentinel底层在做统计QPS做快速失败时用的也是滑动时间窗算法。

    10.1.1 链表实现:

    假设某个服务最多只能每秒钟处理100个请求,可以设置一个1秒钟的滑动时间窗口,用LinkedList表示,该窗口分为10个格子; 每个格子100毫秒,每100毫秒移动一次,每次移动都需要记录当前服务100ms内请求的次数counter到格子中; 如果格子数大于10个,删除最前边的各自,格子数始终保留10个;

    循环整个队列,如果累加counter大于限流请求数,则被限流,否则不会被限流;

    mport java.util.LinkedList;
    import java.util.stream.Collectors;
    
    public class  SlidingWindow {
    
     //使用LinkedList来记录滑动窗口的10个格子。
     LinkedList<Metric> slots = new LinkedList<Metric>();
    
     class Metric {
     //起始时间
     long startTm ;
     //服务访问次数,可以放在Redis中,实现分布式系统的访问计数
     long counter = 0L;
     }
    
    
     private void doCheck() {
     //当前100ms内请数
     Metric metric = new Metric();
     long currentTime = System.currentTimeMillis();
     //当前时间 大于 上一窗口起始时间+ 时间间隔100ms,说明要开始新的窗口了
     if(slots.getLast().getStartTm()+100 <= System.currentTimeMillis()){
     //假如上一次窗口起始时间为1500,当前时间为 1788,则说明【1600-1700】这个时间段没有请求,
     // 那么我们需要补全空数据
     if((currentTime - slots.getLast().getStartTm())/100 > 1){
     LongStream.range(0,(currentTime - slots.getLast().getStartTm())/100 -1).forEach(item ->{
     Metric metric2 = new Metric();
     metric2.setCounter(0);
     metric2.setStartTm(slots.getLast().getStartTm()+100);
     slots.addLast(metric2);
     });
     }else {
     //创建一个新窗口
     metric.setCounter(1);
     //取整
     metric.setStartTm((slots.getLast().getStartTm() / 100) * 100);
     slots.addLast(metric);
     }
     }else {
     //旧的窗口,需要请求重置
     metric.setCounter(slots.getLast().getCounter()+1);
     slots.add(slots.size()-1,metric);
     }
    
    
     //如果linkedList长度大于10个,则删除最前面的一个,体现滑动时间窗
     if (slots.size() > 10) {
     slots.removeFirst();
     }
     //累加最近10个窗口内的访问次数
     Long countTotal = slots.stream().collect(Collectors.summingLong(s ->s.getCounter()));
    
     //比较最后一个节点和第一个节点,两者相差100以上就限流
     if (countTotal > 100) {
     System.out.println("限流了。。");
     //TODO 修改限流标记为true
    
     } else {
     //TODO 修改限流标记为false
     }
     }
    }
    

    这个简化了很多逻辑,而且性能也不太好,但是理解滑动窗口算法是没问题的。

    10.1.2 Sentinel实现:

    Sentinel中是通过环形数组实现的,环形数组可以内存复用降低GC,性能高(高性能队列Disruptor中也有用到),如下图。

    image.png

    Sentinel在做QPS统计时,滑动时间窗有两个维度

    • 毫秒级维度:初始化一个跨度为1000ms,包含两个500ms的时间窗口

    • 秒级维度:还有一个跨度为60s的,包含60个1s的时间窗口

    在毫秒级维度中,仅仅使用两个时间窗口就完成了QPS的计算,并没有做删除时间窗口节点的操作,而是清空原本节点的内容。两个时间窗口代表数组的两个下标。

    通过 (当前时间 / 500ms) % 数组长度2的取模结果,得到当前时间的请求数落在那个时间窗口内;

    然后拿 (当前时间 / 500ms) * 500ms比较时间窗口的起始位置,

    如果与之前 (比如:500ms) 一致,就把当次请求数加入到该窗口内用作统计

    如果与之前不一致,就清空之前时间窗口内统计的数据,并放入当前时间的请求数,完成滑动的操作

    核心源码如下:

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
     long timeId = timeMillis / windowLengthInMs;
     // 计算当前时间在数组中的位置
     return (int) (timeId % array.length());
    }
    //计算当前时间 在 窗口内的 起始时间
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
     return timeMillis - timeMillis % windowLengthInMs;
    }
    
    //获取当前窗口
    public WindowWrap<T> currentWindow(long timeMillis) {
     //当前时间如果小于0,返回空
     if (timeMillis < 0) {
     return null;
     }
     //计算时间窗口的索引
     int idx = calculateTimeIdx(timeMillis);
     // 计算当前时间窗口的开始时间
     long windowStart = calculateWindowStart(timeMillis);
     while (true) {
     //在窗口数组中获得窗口
     WindowWrap<T> old = array.get(idx);
     if (old == null) {
     /*
     *     B0       B1      B2    NULL      B4
     * ||_______|_______|_______|_______|_______||___
     * 200     400     600     800     1000    1200  timestamp
     *                             ^
     *                          time=888
     * 比如当前时间是888,根据计算得到的数组窗口位置是个空,所以直接创建一个新窗口就好了
     */
     WindowWrap<T> window = new WindowWrap<T>(
     windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
     if (array.compareAndSet(idx, null, window)) {
     // Successfully updated, return the created bucket.
     return window;
     } else {
     // Contention failed, the thread will yield its time slice to
     // wait for bucket available.
     Thread.yield();
     }
     } else if (windowStart == old.windowStart()) {
     /*
     *     B0       B1      B2     B3      B4
     * ||_______|_______|_______|_______|_______||___
     * 200     400     600     800     1000    1200  timestamp
     *                             ^
     *                          time=888
     * 这个更好了,刚好等于,直接返回就行
     */
     return old;
     } else if (windowStart > old.windowStart()) {
     /*
     *     B0       B1      B2     B3      B4
     * |_______|_______|_______|_______|_______||___
     * 200     400     600     800     1000    1200  timestamp
     *             B0       B1      B2    NULL      B4
     * |_______||_______|_______|_______|_______|_______||___
     * ...    1200     1400    1600    1800    2000    2200  timestamp
     *                              ^
     *                           time=1676
     * 这个要当成圆形理解就好了,之前如果是1200一个完整的圆形,然后继续从1200开始,如果现在时间是1676,落在在B2的位置,
     * 窗口开始时间是1600,获取到的old时间其实会是600,所以肯定是过期了,直接重置窗口就可以了
     */
     if (updateLock.tryLock()) {
     try {
     // Successfully get the update lock, now we reset the
     // bucket.
     return resetWindowTo(old, windowStart);
     } finally {
     updateLock.unlock();
     }
     } else {
     Thread.yield();
     }
     } else if (windowStart < old.windowStart()) {
     // 这个不太可能出现,嗯。。时钟回拨
     return new WindowWrap<T>(
     windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
     }
     }
    }
    
    //统计时间窗口总的请求数量
    public List<T> values(long timeMillis) {
     // 把当前时间窗口中的bucket WindowWrap 都返回出去  用来统计时间窗口总的请求数量
     if (timeMillis < 0) {
     return new ArrayList<T>();
     }
     int size = array.length();
     List<T> result = new ArrayList<T>(size);
    
     for (int i = 0; i < size; i++) {
     WindowWrap<T> windowWrap = array.get(i);
     if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
     continue;
     }
     result.add(windowWrap.value());
     }
     return result;
     }
     //校验时间窗口是否过期,相当于上一个例子中的补全空数据,这里更优雅
     public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
     return time - windowWrap.windowStart() > intervalInMs;
     }
    

    10.2 令牌桶算法

    image.png

    简化demo:

    /**
     * 令牌桶限流算法
     */
    public class TokenBucket {
     public long timeStamp = System.currentTimeMillis();  // 当前时间
     public long capacity; // 桶的容量
     public long rate; // 令牌放入速度
     public long tokens; // 当前令牌数量
    
     public boolean grant() {
     long now = System.currentTimeMillis();
     // 先添加令牌
     tokens = Math.min(capacity, tokens + (now - timeStamp) * rate);
     timeStamp = now;
     if (tokens < 1) {
     // 若不到1个令牌,则拒绝
     return false;
     } else {
     // 还有令牌,领取令牌
     tokens -= 1;
     return true;
     }
     }
    }
    

    限流过程如下:

    1. 刚开始,系统的 QPS 非常低,初始化我们就直接把令牌桶塞满了;

    2. 然后这个低 QPS 的状态持续了一段时间,因为我们一直会填充最大 QPS 数量的令牌(因为取最小值,所以其实桶里令牌基本不会有变化),所以令牌桶一直处于满的状态,整个系统的限流也处于一个比较低的水平。这以上的部分一直处于警戒线之上。实际上就是叫做冷启动 / 预热的过程;

    3. 接着系统的 QPS 突然激增,令牌消耗速度太快。就算我们每次增加最大 QPS 数量的令牌任然无法维持消耗,所以桶里的令牌在不断低减少。这个时候,冷启动阶段的限制 QPS 也在不断地提高,最后直到桶里的令牌低于警戒线;

    4. 低于警戒线之后,系统就会按照最高 QPS 去限流,这个过程就是系统在逐渐达到最高限流的过程。那这样一来,实际就达到了我们处理突增流量的目的,整个系统在漫漫地适应突然飙高的 QPS,然后最终达到系统的 QPS 阈值;

    5. 最后,如果 QPS 回复正常,那么又会逐渐回到警戒线之上,就回到了最开始的过程。

    管理端配置如下(流控效果选 Warm Up):

    image.png

    公式推导如下:

    img
    • 横坐标storedPermits代表存储桶中的令牌数量;

    • 纵坐标代表获取一个令牌需要的时间,即请求通过的时间间隔;

    stableInterval(稳定区间):稳定产生令牌的时间间隔,假定限流阈值QPS为1000,stableInterval的值为1毫秒。

    coldInterval:冷启动产生令牌的最大时间隔间,等于稳定产生令牌的时间间隔乘以冷启动系数(stableInterval * coldFactor),Sentinel中coldFactor默认为3。

    warmupPeriod:预热时间,即冷启动周期,对应上图中的梯形面积,Sentinel中默认为10秒。

    thresholdPermits:从冷启动到正常的令牌桶中令牌数量的阈值,当令牌桶中的令牌数量超越该值时,则进入冷启动阶段。

    因为coldFactor默以为3,所以(coldInterval - stableInterval)是stableInterval的两倍,所以从thresholdPermits到0的时刻是从maxPermits到thresholdPermits时刻的一半,也便是冷启动周期的一半。因为梯形的面积等于warmupPeriod,所以长方形面积是梯形面积的一半,长方形的面积是warmupPeriod / 2。

    依据长方形面积公式:长 * 宽 = 面积

    count = 1000; 限流阈值

    stableInterval=1/count= 0.001

    coldInterval = 0.003s

    warmupPeriod = 10s 预热时间 10s

    可得:

    thresholdPermits=0.5*warmupPeriod/stableInterval= 0.5 * 10 / 1ms = 5000
    

    maxPermits:最大答应桶中寄存的令牌数。

    依据梯形的面积公式:(上低 + 下低)* 高 / 2

    可得:

    warmupPeriod=(stableInterval+coldInterval)*(maxPermits-thresholdPermits)/2
    

    推出:

    maxPermits=thresholdPermits+2*warmupPeriod/(stableInterval+coldInterval)
    = 5000 + 2* 10 /(1ms + 3ms)
    = 5000 + 20/0.004 = 5000 + 5000 = 10000
    

    slope:直线的斜率,即出产令牌的速率。

    依据斜率核算公式:(y2-y1) / (x2-x1),可得:

    slope=(coldInterval-stableInterval)/(maxPermits-thresholdPermits)
    = 0.002/5000
    

    10.3 漏桶算法

    漏桶算法是将访问请求放入漏桶中,当请求达到限流值,则进行丢弃(触发限流策略)。无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求。超过漏桶容量的直接抛弃。

    管理端配置:匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。阈值必须设置为QPS。

    这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。

    image.png

    首先计算出当前请求平摊到 1 秒内的时间花费,然后去计算这一次请求预计时间; 如果小于当前时间的话,那么以当前时间为主,返回即可; 反之如果超过当前时间的话,这时候就要进行排队等待了。等待的时候要判断是否超过当前最大的等待时间,超过就直接丢弃; 没有超过就更新上一次的通过时间,然后再比较一次是否超时。如果还超时就重置时间,反之在等待时间范围之内的话就等待。如果都不是,那就可以通过了。

    public class RateLimiterController implements TrafficShapingController {
     //最大等待超时时间,默认500ms
     private final int maxQueueingTimeMs;
     //限流数量
     private final double count;
     //上一次的通过时间
     private final AtomicLong latestPassedTime = new AtomicLong(-1);
     @Override public boolean canPass(
     Node node, int acquireCount, boolean prioritized) {
     // Pass when acquire count is less or equal than 0.
     if (acquireCount <= 0) {
     return true;
     }
     // Reject when count is less or equal than 0.
     // Otherwise,the costTime will be max of long and waitTime will overflow
     // in some cases.
     if (count <= 0) {
     return false;
     }
     long currentTime = TimeUtil.currentTimeMillis();
     //时间平摊到1s内的花费
     long costTime = Math.round(
     1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms
    
    
     //计算这一次请求预计的时间
     long expectedTime = costTime + latestPassedTime.get();
    
     //花费时间小于当前时间,pass,最后通过时间 = 当前时间
     if (expectedTime <= currentTime) {
     latestPassedTime.set(currentTime);
     return true;
     }
     else {
     //预计通过的时间超过当前时间,要进行排队等待,重新获取一下,避免出现问题,差额就是需要等待的时间
     long waitTime = costTime + latestPassedTime.get()
     - TimeUtil.currentTimeMillis();
     //等待时间超过最大等待时间,丢弃
     if (waitTime > maxQueueingTimeMs) {
     return false;
     } else {
     //反之,可以更新最后一次通过时间了
     long oldTime = latestPassedTime.addAndGet(costTime);
     try {
     waitTime = oldTime - TimeUtil.currentTimeMillis();
     //更新后再判断,还是超过最大超时时间,那么就丢弃,时间重置
     if (waitTime > maxQueueingTimeMs) {
     latestPassedTime.addAndGet(-costTime);
     return false;
     }
     //在时间范围之内的话,就等待
     if (waitTime > 0) {
     Thread.sleep(waitTime);
     }
     return true;
     } catch (InterruptedException e) {
     }
     }
     }
     return false;
     }
    }
    

    相关文章

      网友评论

          本文标题:Sentinel涉及的限流算法

          本文链接:https://www.haomeiwen.com/subject/lvwcldtx.html