美文网首页
Guava限流

Guava限流

作者: heyong | 来源:发表于2018-10-12 17:57 被阅读54次

    一、概述

    在高并发环境下,为了防止大量流量对于系统的冲击,需要对服务的调用进行限流,拒绝部分请求,保证系统的可用性。常见的限流方法有两种:令牌桶算法、漏斗算法

    二、算法原理

    (一) 令牌桶算法

    1、算法描述:

    • 假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中(每秒会有r个令牌放入桶中);

    • 假设桶中最多可以存放b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;

    • 当一个n个字节的数据包到达时,就从令牌桶中删除n个令牌(不同大小的数据包,消耗的令牌数量不一样),并且数据包被发送到网络;

    • 如果令牌桶中少于n个令牌,那么不会删除令牌,并且认为这个数据包在流量限制之外(n个字节,需要n个令牌。该数据包将被缓存或丢弃);

    • 算法允许最长b个字节的突发,但从长期运行结果看,数据包的速率被限制成常量r。对于在流量限制外的数据包可以以不同的方式处理:

      1. 它们可以被丢弃;
      2. 它们可以排放在队列中以便当令牌桶中累积了足够多的令牌时再传输;
      3. 它们可以继续发送,但需要做特殊标记,网络过载的时候将这些特殊标记的包丢弃。

    2、 算法流程图


    image.png

    (二) 漏斗算法

    1、算法描述:
    漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率

    三、Guava RateLimiter使用

    (一) 如何使用RateLimiter

    在工程中导入Guava Jar包

    RateLimiter limiter = RateLimiter.create(1);
    System.out.println(limiter.acquire(2));
    System.out.println(limiter.acquire(6));
    System.out.println(limiter.acquire(2));
    

    2、输出结果

    0.0
    1.998433
    5.992647
    

    从输出结果可以看出,RateLimiter支持预消费,比如在acquire(6)时,等待时间是1.99秒,是上一个获取令牌时预消费了个2两,所以需要等待 2 * 1 秒,然后又预消费了6个令牌,acquire(2)等待时间为 6 * 1 秒。

    四、RateLimiter原理

    (一) RateLimiter关键属性

    • maxBurstSeconds 在RateLimiter未使用时,最多存储几秒的令牌

    • storedPermits 当前存储令牌数

    • maxPermits 最大存储令牌数

    • stableIntervalMicros 添加令牌时间间隔(1秒/每秒的令牌数)

    • nextFreeTicketMicros 下一次请求可以获取令牌的起始时间,由于RateLimiter允许预消费,上次请求预消费令牌后,下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌

    (二) RateLimiter创建

    1、RateLimiter 通过调用create()方法创建,具体实现如下:

    static RateLimiter create(SleepingTicker ticker, double permitsPerSecond) {
        RateLimiter rateLimiter = new Bursty(ticker, 1.0 /* maxBurstSeconds */);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
    }
    

    2、SleepingTicker是Guava封装的一个时间类,maxBurstSeconds表示在ReteLimiter未使用时,最多保存几秒的令牌,默认是1;通过new Bursty创建了一个RateLimiter对象,然后调用 RateLimiter.setRate()方法进行初始化工作。具体实现如下:

    public final void setRate(double permitsPerSecond) {
        //参数判断
        boolean expression = permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond);
        String errorMessage = "rate must be positive";
        if (!expression) {
            throw new IllegalArgumentException(String.valueOf(errorMessage));
        }
        synchronized (mutex) {
            //生成令牌以及更新下一次请求可以获取令牌的起始时间
            resync(readSafeMicros());
            //添加令牌时间间隔计算
            double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
            this.stableIntervalMicros = stableIntervalMicros;
            doSetRate(permitsPerSecond, stableIntervalMicros);
        }
    }
    

    3、上面代码先进行参数判断,然后生成令牌以及更新下一次请求可以获取令牌的起始时间,并且计算添加令牌的时间间隔,最后又调用了Bursty的doSetRate方法设置存储的令牌桶数,下面分析一下resync()和doSetRate()方法的实现。

    private void resync(long nowMicros) {
        // if nextFreeTicket is in the past, resync to now
        // 如果nextFreeTicket过去的时间,则更新为现在的时间
        if (nowMicros > nextFreeTicketMicros) {
            //更新存储的的令牌桶数
            storedPermits = Math.min(maxPermits,
                    storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
            nextFreeTicketMicros = nowMicros;
        }
    }
    

    上面判断当前的时间点是否大于上次获取令牌桶时计算出来的下一次请求可以获取令牌的起始时间,如果大于就更新令牌桶数,并且设置下次请求可以获取令牌桶的时间。

    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
        double oldMaxPermits = this.maxPermits;
        //设置最大存储令牌桶数
        maxPermits = maxBurstSeconds * permitsPerSecond;
        //设置现有令牌桶数
        storedPermits = (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
    }
    

    (三) 获取令牌

    1、通过调用acquire()方法获取令牌,如果之前预消费了令牌,那么就会阻塞,从而实现限制访问速率,下面查看acquire()方法的具体实现。

    public double acquire(int permits) {
        checkPermits(permits);
        long microsToWait;
        synchronized (mutex) {
            //计算需要阻塞的时间
            microsToWait = reserveNextTicket(permits, readSafeMicros());
        }
        //guava封装的时间类,阻塞
        ticker.sleepMicrosUninterruptibly(microsToWait);
        //返回阻塞的时间,单位为秒
        return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
    }
    

    上面的代码实现中,调用reserveNextTicket()方法计算需要阻塞的时间,下面查看方法的具体实现:

    private long reserveNextTicket(double requiredPermits, long nowMicros) {
        //生成令牌以及更新下一次请求可以获取令牌的起始时间
        resync(nowMicros);
        //计算需要等待的时间
        long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
        double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits);
        double freshPermits = requiredPermits - storedPermitsToSpend;
        long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
                + (long) (freshPermits * stableIntervalMicros);
        //更新下次请求可以获取令牌的起始时间
        this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
        this.storedPermits -= storedPermitsToSpend;
        return microsToNextFreeTicket;
    }
    

    相关文章

      网友评论

          本文标题:Guava限流

          本文链接:https://www.haomeiwen.com/subject/rpigaftx.html