限流

作者: yzn2015 | 来源:发表于2020-10-20 00:17 被阅读0次

    在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流
    缓存:缓存的目的是提升系统访问速度和增大系统处理容量
    降级:降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开
    限流:限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

    常见的应用:短信/邮件的提供商需要限制每秒可以发多少封短信/邮件
    当前最主要的限流方式有两种:漏桶算法和令牌桶算法

    一、漏桶算法

    漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。


    维基百科-漏桶展示

    nginx 有两个限流模块,从 github 上 clone 代码,位置在 nginx/src/http/modules 目录下:

    ngx_http_limit_req_module.c (nginx 的 limit_req 模块,用来 限制时间窗口内的平均速率)
    ngx_http_limit_conn_module.c (nginx 的 limit_conn 模块,用来限制并发连接数)
    两者都是按照 IP 或者域名限制的
    nginx Github地址:https://github.com/nginx/nginx/tree/master/src/http/modules
    nginx 文档:http://nginx.org/en/docs/http/ngx_http_limit_conn_module.html

    limit_conn_zone $binary_remote_addr zone=addr:10m;
    
    server {
        location /download/ {
            limit_conn addr 1;
        }
    
    limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
    
    server {
        location /search/ {
            limit_req zone=one burst=5 nodelay; #delay 并发数 burst桶的大小
        } 
    
    //limit,限流策略;hash,记录key的hash值;data,记录key的数据内容;len,记录key的数据长度;ep,待处理请求数目;account,是否是最后一条限流策略
    static ngx_int_t ngx_http_limit_req_lookup(ngx_http_limit_req_limit_t *limit, ngx_uint_t hash, u_char *data, size_t len, ngx_uint_t *ep, ngx_uint_t account)
    {
        //红黑树查找指定界定,sentinel代表红黑树的NULL节点
        while (node != sentinel) {
     
            if (hash < node->key) {
                node = node->left;
                continue;
            }
     
            if (hash > node->key) {
                node = node->right;
                continue;
            }
     
            //hash值相等,比较数据是否相等
            lr = (ngx_http_limit_req_node_t *) &node->color;
     
            rc = ngx_memn2cmp(data, lr->data, len, (size_t) lr->len);
            //查找到
            if (rc == 0) {
                ngx_queue_remove(&lr->queue);
                ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); //将记录移动到LRU队列头部
         
                ms = (ngx_msec_int_t) (now - lr->last); //当前时间减去上次访问时间
              
                if (ms < -60000) {
                    ms = 1;
    
                } else if (ms < 0) {
                    ms = 0;
                }
                //漏桶算法
                excess = lr->excess - ctx->rate * ms / 1000 + 1000; //待处理请求书-限流速率*时间段+1个请求(速率,请求数等都乘以1000了)
                //当前积压令牌数 = 上次积压令牌数 - 这段时间可以产生的令牌数 + 本次请求(1 个令牌)
     
                if (excess < 0) {
                    excess = 0;
                }
     
                *ep = excess;
     
                //待处理数目超过burst(等待队列大小),返回NGX_BUSY拒绝请求(没有配置burst时,值为0)
                if ((ngx_uint_t) excess > limit->burst) {
                    return NGX_BUSY;
                }
     
                if (account) {  //如果是最后一条限流策略,则更新上次访问时间,待处理请求数目,返回NGX_OK
                    lr->excess = excess;
                    lr->last = now;
                    return NGX_OK;
                }
                //访问次数递增
                lr->count++;
     
                ctx->node = lr;
     
                return NGX_AGAIN; //非最后一条限流策略,返回NGX_AGAIN,继续校验下一条限流策略
            }
     
            node = (rc < 0) ? node->left : node->right;
        }
     
        //假如没有查找到节点,需要新建一条记录
        *ep = 0;
      
        size = offsetof(ngx_rbtree_node_t, color)
                + offsetof(ngx_http_limit_req_node_t, data)
                + len;
        //尝试淘汰记录(LRU)
        ngx_http_limit_req_expire(ctx, 1);
     
        node = ngx_slab_alloc_locked(ctx->shpool, size);
        if (node == NULL) {  //空间不足,分配失败
            ngx_http_limit_req_expire(ctx, 0); //强制淘汰记录
     
            node = ngx_slab_alloc_locked(ctx->shpool, size); 
            if (node == NULL) { //分配失败,返回NGX_ERROR
                return NGX_ERROR;
            }
        }
     
        node->key = hash; 
        lr = (ngx_http_limit_req_node_t *) &node->color;
        lr->len = (u_char) len;
        lr->excess = 0;
        ngx_memcpy(lr->data, data, len);
     
        ngx_rbtree_insert(&ctx->sh->rbtree, node);  //插入记录到红黑树与LRU队列
        ngx_queue_insert_head(&ctx->sh->queue, &lr->queue);
     
        if (account) { //如果是最后一条限流策略,则更新上次访问时间,待处理请求数目,返回NGX_OK
            lr->last = now;
            lr->count = 0;
            return NGX_OK;
        }
     
        lr->last = 0;
        lr->count = 1;
     
        ctx->node = lr;
     
        return NGX_AGAIN;  //非最后一条限流策略,返回NGX_AGAIN,继续校验下一条限流策略
         
    }
    

    当一个新请求进入 Nginx 的限流流程大致如下:

    计算当前请求 IP 地址 hash 值(hash 值相等后进而使用 IP 内容判断),在存放请求 IP 的红黑树中查找对应位置
    计算当前请求和上次请求时间 (保存在红黑树节点的 value 中) 的差值 ms
    根据公式 “excess = lr->excess - ctx->rate * ms / 1000 + 1000” 计算(漏桶算法的核心)
    更新当前节点信息(上一次请求时间等),根据限流结果返回响应


    请求限流-桶限.png

    二、令牌桶算法

    令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。


    令牌桶示意图

    RateLimiter

    令牌桶比较代表的实现方法是Guava下的RateLimiter

    RateLimiter有两种实现方式,SmoothBursty(非预热)及SmoothWarmingUp(预热,冷启动),这里主要讨论的是非预热的方式,预热的方式暂时没看明白

    import com.google.common.util.concurrent.RateLimiter;
    import lombok.extern.slf4j.Slf4j;
    
    import java.time.ZonedDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by yangzaining on 2020-10-14.
     */
    @Slf4j
    public class TestDemo {
        /**
         * QPS:5/s 两个线程,各间隔50ms获取一次,每次获取一个令牌
         * <p>
         * 疑问:
         * 先前五个为什么不是都获取成功?
         * 刚初始化,所以没有令牌,所以获取到一个则返回一个
         */
        public static void test1() throws InterruptedException {
            testRateLimiter(2, 11, 50, 5, 0, false);
        }
    
        /**
         * QPS:5/s 两个线程,各间隔50ms获取一次,每次获取一个令牌,令牌桶初始化1s
         * <p>
         * 疑问:
         * 令牌存储的是5个,为啥前6次都成功了?
         * 头五次用的是令牌桶中的令牌,第6次用的是下一个刻度的令牌,延迟计算
         */
        public static void test2() throws InterruptedException {
            testRateLimiter(2, 11, 50, 5, 1000, false);
        }
    
        /**
         *
         * QPS:5/s 两个线程,各间隔50ms获取一次,每次获取一个令牌,令牌桶初始化1s
         * 清楚的看见后面的访问等待的时间逐步增加
         */
        public static void test3() throws InterruptedException {
            testRateLimiter(2, 11, 50, 5, 1000, true);
        }
    
    
        private static void testRateLimiter(int threadNumber, int count, int taskGapTime, int qps, int sleepTime, boolean useAcquire) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);
            ZonedDateTime start = ZonedDateTime.now();
            RateLimiter rateLimiter = RateLimiter.create(qps);
            log.info("StartTime = {}", DateTimeFormatter.ISO_INSTANT.format(start));
            if (sleepTime != 0) {
                Thread.sleep(sleepTime);
            }
            for (int i = 0; i < threadNumber; i++) {
                executorService.submit(() -> {
                    Boolean flag = null;
                    double waitTime = 0;
                    for (int j = 0; j < count; j++) {
                        if (useAcquire) {
                            waitTime = rateLimiter.acquire();
                        } else {
                            flag = rateLimiter.tryAcquire();
                        }
                        ZonedDateTime endTime = ZonedDateTime.now();
                        try {
                            Thread.sleep(taskGapTime);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        if (useAcquire) {
                            log.info("waitTime = {}, time = {}", waitTime, DateTimeFormatter.ISO_INSTANT.format(endTime));
                        } else {
                            log.info("canAcquire = {}, time = {}", flag, DateTimeFormatter.ISO_INSTANT.format(endTime));
                        }
                    }
                });
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
    //        test1();
    //        test2();
            test3();
        }
    }
    
    
      @Override
      final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
        resync(nowMicros);//重新计算当前存储的令牌数(距离上次计算的时间),nextFreeTicketMicros = nowMicros
        long returnValue = nextFreeTicketMicros; //nowMicros
        double storedPermitsToSpend = min(requiredPermits, this.storedPermits);//存储令牌数
        double freshPermits = requiredPermits - storedPermitsToSpend;//还需要的令牌数
    
        long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);//请求令牌需要等待的时间
    
        this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;//下次生成令牌的时间
        this.storedPermits -= storedPermitsToSpend;//更新剩余令牌数
        return returnValue; //nowMicros
      }
    
    时间流水图.png

    如图所示,任务7若需要请求令牌,就需要偿还任务6获取令牌所需等待时间

    Redis+lua

    RateLimiter只满足了单机的限流方式,多台机器的限流需要用到redis lua脚本的方式去实现

    1.减少网络开销:本来多次网络请求的操作,可以用一个请求完成,原先多次请求的逻辑放在lua脚本中,通过redis lua解释器去完成。使用脚本,减少了网络往返时延。
    2.原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。

        @Override
        public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"//速率
                  + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"//过期时间
                  + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",//类型
                    Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
        }
        private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    "local rate = redis.call('hget', KEYS[1], 'rate');"
                  + "local interval = redis.call('hget', KEYS[1], 'interval');"
                  + "local type = redis.call('hget', KEYS[1], 'type');"
                  + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
                  
                  + "local valueName = KEYS[2];"
                  + "if type == '1' then "
                      + "valueName = KEYS[3];"
                  + "end;"
                  
                  + "local currentValue = redis.call('get', valueName); "
                  + "if currentValue ~= false then "//当前存有值
                         + "if tonumber(currentValue) < tonumber(ARGV[1]) then "//若比剩余数量小,则等待下一个时间片段
                             + "return redis.call('pttl', valueName); "
                         + "else "
                             + "redis.call('decrby', valueName, ARGV[1]); "//足够的话,则减去相应的令牌
                             + "return nil; "
                         + "end; "
                  + "else "
                         + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
                         + "redis.call('set', valueName, rate, 'px', interval); "//设置对应的令牌数,同时扣除本次的令牌数量
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "return nil; "
                  + "end;",
                    Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), 
                    value, commandExecutor.getConnectionManager().getId().toString());
        }
    
    

    总结

    算法 特点 内容
    nginx req_limit_model(漏桶算法) 把请求以平均速率消费,多出桶内数量的请求则被拒绝 根据上一次消费的时间计算本次消费所堆积的数量,excess = lr->excess - ctx->rate * ms / 1000 + 1000
    Guava rateLimiter 同漏桶算法,额外支持突发的流量请求(存储的令牌数),需要注意第一次消费数量不受限 根据访问的时间计算,当前是否需要等待令牌产生,无需等待,则后续的请求补偿本次的等待时间
    Redis lua 通过lua脚本的形式保证原子性,适用集群的限流 通过以速率作为过期时间存储令牌数,若剩余的数量不足时,则返回对应的等待时间,与rateLimiter不同的是,rateLimiter是边初始化边放令牌,而redis lua是一开始就存储最大令牌数

    参考文献:https://segmentfault.com/a/1190000020272200?utm_source=tag-newest

    相关文章

      网友评论

          本文标题:限流

          本文链接:https://www.haomeiwen.com/subject/znyppktx.html