美文网首页微服务架构与应用Guava
限流,从令牌桶算法到RateLimiter源码

限流,从令牌桶算法到RateLimiter源码

作者: captain_fu | 来源:发表于2018-08-18 19:07 被阅读58次

    限流,是服务或者应用对自身保护的一种手段,通过限制或者拒绝调用方的流量,来保证自身的负载。

    常用的限流算法有两种:漏桶算法和令牌桶算法

    漏桶算法

        思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

    对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

    令牌桶算法

        原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

    下面的内容主要讨论令牌桶算法。

    仔细讨论之前,先看下一下基于分布式缓存实现的令牌桶的流程图。

    以下是对流程图的讲解:

    1. key是否存在。因为流程图是基于分布式缓存做的集群限流,需要根据不同key做统计,第一次访问初始化key。

    2. 如果key不存在,初始化令牌桶,防止初始令牌数量,并且设置key过期时间为interval*2。这里的初始令牌数量一般可以设置成限流阈值,比如限流10qps,初始值可以设置成10,来应对一开始的流量。interval是间隔时间,比如限流阈值10qps,interval设置为1s。过期时间是缓存中key的时间,interval*2是为了防止key过期无法拦截流量。

    3. 如果key存在,将当前请求时间和当前key的最后放置令牌时间做比较。如果间隔超过interval,进入第4步,间隔未超过interval,进入第5步。

    4. 间隔已经超过1s,直接放置令牌到最大数量。

    5. 间隔没有超过1s,定义delta为时间差,放置令牌数=delta/(1/qps)。放入令牌时保证令牌数不超过桶的容量。同时,重置放入令牌的时间。

    6. 从桶中获取令牌,获取令牌成功,执行请求;获取令牌时间,拒绝请求。

    以上是对令牌桶算法的一种实现,接下来会具体分析guava RateLimiter的源码,RateLimiter的原理和上述实现类似,但是会有部分区别。

    最基础的使用RateLimiter的姿势如下:

    RateLimiter rateLimiter = RateLimiter.create(10.0);

    rateLimiter.acquire();

    create方法用于构建既定速度的实例,acquire方法使用阻塞方式获取令牌。

    首先进入源码,看下create方法:

    static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {

          RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);

          rateLimiter.setRate(permitsPerSecond);

          return rateLimiter;

    }

    这里我们看到,会构建一个SmoothBursty实例,并且给这个实例设置速率。

    /**

    * This implements a "bursty" RateLimiter, where storedPermits are translated to

    * zero throttling. The maximum number of permits that can be saved (when the RateLimiter is

    * unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this

    * time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.

    */

    SmoothBursty的注释翻译如下:

    这是一个“突发性”的RateLimiter实现,这里存储令牌数可以被转义成“零节流”。存储的最大令牌数被存储成(如果RateLimiter实例有一段时间没有被获取令牌)一个时间形式。举例:如果一个RateLimiter实例的速率是2qps,并且maxBurstSeconds时间是10,那么最多可以存储20个令牌(令牌桶容量)。

    看一下SmoothBursty的构造函数:

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {

          super(stopwatch);

          this.maxBurstSeconds = maxBurstSeconds;

    }

    下面简单介绍下SleepingStopwatch是什么。

    @VisibleForTesting

    abstract static class SleepingStopwatch {

      /*   * We always hold the mutex when calling this. TODO(cpovirk): Is that important? Perhaps we need

      * to guarantee that each call to reserveEarliestAvailable, etc. sees a value >= the previous?

      * Also, is it OK that we don't hold the mutex when sleeping?

      */

          abstract long readMicros();

          abstract void sleepMicrosUninterruptibly(long micros);

          static final SleepingStopwatch createFromSystemTimer() {

                return new SleepingStopwatch() {

                      final Stopwatch stopwatch = Stopwatch.createStarted();

                      @Override

                      long readMicros() {

                            return stopwatch.elapsed(MICROSECONDS);

                      }

                      @Override

                      void sleepMicrosUninterruptibly(long micros) {

                            if (micros > 0) {

                                  Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);

                            }

                      }

                };

          }

    }

    SleepingStopWatch是一个可sleep的秒表,起始时间是构建StopWatch的时间,sleepMicrosUninterruptibly方法支持不受中断的sleep,sleep是当前线程的sleep。

    以上构建方法完成,下面再来看一下acquire的源码。

    public double acquire(int permits) {

          long microsToWait = reserve(permits);

          stopwatch.sleepMicrosUninterruptibly(microsToWait);

          return 1.0 * microsToWait / SECONDS.toMicros(1L);

    }

    第一行是获取令牌需要等待时间;第二行是线程sleep时间,如果令牌足够,这里会返回0,无需sleep;第三行是返回等待时间值,单位转换成秒。

    接下来看下reserve实现。

    final long reserve(int permits) {

          checkPermits(permits);

          synchronized (mutex()) {

                return reserveAndGetWaitLength(permits, stopwatch.readMicros());

          }

    }

    获取锁之后,直接调用reserveAndGetWaitLength方法,传入参数是需要获取的令牌数、秒表的当前时间。

    final long reserveAndGetWaitLength(int permits, long nowMicros) {

          long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

          return max(momentAvailable - nowMicros, 0);

    }

    首先计算获取令牌需要的时间节点,如果时间节点小于当前时间,无需等待;如果时间节点在当前时间节点之后,需要sleep线程,sleep时间是momentAvailable和nowMicros的差值。

    下面看下reserveEarliestAvailable,计算时间节点的实现。

    final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {

          resync(nowMicros);

          long returnValue = nextFreeTicketMicros;

          double storedPermitsToSpend = min(requiredPermits, this.storedPermits);

          double freshPermits = requiredPermits - storedPermitsToSpend;

          long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)       + (long) (freshPermits * stableIntervalMicros);

        try {

                this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);

          } catch (ArithmeticException e) {

                this.nextFreeTicketMicros = Long.MAX_VALUE;

          }

          this.storedPermits -= storedPermitsToSpend;

          return returnValue;

    }

    这里面有几个变量需要注意下:

        nextFreeTicketMicros 下次允许获取令牌的时间,这个时间是因为RateLimiter允许透支而存在的,比如当前令牌桶只有一个令牌,一个请求来获取5个令牌,请求会成功,但是nextFreeTicketMicros往后推4个时间片段,在当前时间推移到nextFreeTicketMicros之前,所有请求都将等待。如果长时间没有请求到来,这个值会是过去的一个时间值。

        storedPermits 当前令牌桶剩余的令牌数。

        stableIntervalMicros 时间片段值,qps为5的话,时间片段是200ms。

        maxPermits 令牌桶的容量。

    下面开始分析代码。

    resync(nowMicros);//根据当前时间和nextFreeTicketMicros,往令牌桶放置令牌,最多不超过令牌桶的maxPermits。

    long returnValue = nextFreeTicketMicros; //赋值语句

    double storedPermitsToSpend =min(requiredPermits,this.storedPermits);//请求的令牌数和令牌桶当前令牌数做比较,取较小值,storedPermitsToSpend是指需要消耗当前令牌桶的令牌数量。

    double freshPermits = requiredPermits - storedPermitsToSpend;//需要透支的令牌数量

    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)

        +(long) (freshPermits *stableIntervalMicros);//如果不透支,waitMicros为0,下次请求可以正常获取令牌;如果透支,需要将nextFreeTicketMicros往后推。

    try {

      this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);

    } catch (ArithmeticException e) {

      this.nextFreeTicketMicros = Long.MAX_VALUE;

    }

    //将nextFreeTicketMicros往后推

    this.storedPermits -= storedPermitsToSpend;//清算令牌桶的令牌。

    至此,最基础的创建RateLimiter和阻塞获取令牌的过程已经分析完毕。

    总结:

    1. 文章前端的流程图和guava RateLimiter的实现类似。

    2. guava RateLimiter支持透支,如果每次获取单个令牌,那么透支将不会生效。

    相关文章

      网友评论

        本文标题:限流,从令牌桶算法到RateLimiter源码

        本文链接:https://www.haomeiwen.com/subject/awhfbftx.html