分布式限流大作战

作者: 凌云_00 | 来源:发表于2018-12-09 12:46 被阅读6次

    开篇

      一周一篇技术博文又来了,这周我们讲点什么呢?看标题就知道了,那就是分布式下的限流策略(实在不知道写些什么好呢),至于限流的用处,好处,和处理场景就不这里赘述了(Google全是)。ok,凌云小课堂正式开始啦,今天要介绍的主要是三种限流策略

    1. 计数限流  
    2. 漏桶限流  
    3. 令牌桶限流 
    

      基本上述三种限流策略可以涵盖大多数需要限流的场景。为了和广大的技术博主不一样(555),我们今天只讲分布式下的限流实现,至于什么是漏桶和令牌桶Google喽。。。

    计数限流(时间窗)

      计数限流主要有两种,一个是计数器限流,简单粗暴不值一提,另一个就是本文要讲的滑动时间窗限流法,我们粗暴的贴代码。。。

    /**
         * 简单时间窗限流策略:
         * 每X秒允许行为发生Y次
         *
         * @param key
         * @param period
         * @param maxCount
         * @return
         */
        public boolean isActionAllowed(String key, int period, int maxCount) {
            ShardedJedis jedis = pool.getResource();
            long nowTs = System.currentTimeMillis();
            try {
                ShardedJedisPipeline pipe = jedis.pipelined();
                // 移除时间窗之前所有集合
                pipe.zremrangeByScore(key, 0f, new Double(nowTs - period * 1000));
                // 获取窗口内的行为数量
                Response<Long> count = pipe.zcard(key);
                pipe.sync();
                // 是否超过限制
                if (count.get() <= maxCount) {
                    // 记录行为
                    pipe.zadd(key, nowTs, "" + nowTs);
                    // 设置zset过期时间,避免冷用户持续占用内存,过期时间等于时间窗口,再多宽限 1s
                    pipe.expire(key, period + 1);
                    pipe.sync();
                    return true;
                }
            } catch (Exception e) {
                log.error("redis timeLimit:Error", e);
            } finally {
                jedis.close();
            }
            return false;
        }
    

      这个限流策略是基于Redis实现的。主要精髓就是利用zset的score数学特性,将时间戳存于score中。处理逻辑如下

    1.每次进件先移除时间窗(给定的时间周期)外的所有key
    2.对比当前zset中key的总量与MaxCount确定是否放弃本次进件
    3.如果同意zset中新增value
    4.设置过期时间
    

      这个实现方式在并发量不大的应用中是完全可以应付的,但是一旦并发量过大在第2步和第3步之间因为不是原子操作,所以可能出现key的总量突破最大限流数。所以一定要用在合适的场景中。为什么不用lua实现原子操作呢?因为ShardedJedis不支持。。。默默哭泣中。

    漏桶限流-令牌桶限流

      漏桶和令牌桶是一对孪生兄弟,它两都是流量整形,限流中的常用算法,唯一区别呢就是令牌桶是允许突发流量的而漏桶严格限制流量速率。他们的实现甚至可能一摸一样。。。
      我们继续暴力贴代码。。555

    
    /**
     * 漏斗限流
     *
     * @author Lingyun
     * @Date 2018-12-08 21:48
     */
    public class FunnelRateLimiter {
    
        private Map<String, Funnel> funnels = new HashMap<>();
    
        public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
            String key = String.format("%s:%s", userId, actionKey);
            Funnel funnel = funnels.get(key);
    
            if (funnel == null) {
                funnel = new Funnel(capacity, leakingRate);
                funnels.put(key, funnel);
            }
    
            return funnel.watering(1); // 需要1个quota
        }
    
        static class Funnel {
    
            /**
             * 漏斗容量
             */
            int capacity;
            /**
             * 漏嘴流水速率
             */
            float leakingRate;
            /**
             * 漏斗剩余空间
             */
            int leftQuota;
            /**
             * 上一次进水时间
             */
            long leakingTs;
    
            public Funnel(int capacity, float leakingRate) {
                this.capacity = capacity;
                this.leakingRate = leakingRate;
                this.leftQuota = capacity;
                this.leakingTs = System.currentTimeMillis();
            }
    
            /**
             * 空间修正
             */
            void makeSpace() {
                long nowTs = System.currentTimeMillis();
                long deltaTs = nowTs - leakingTs;//距离上次进水时间差
                int deltaQuota = (int) (deltaTs * leakingRate);//可腾出空间
    
                if (deltaQuota < 0) { // 间隔时间过长,整数数字过大溢出
                    this.leftQuota = capacity;
                    this.leakingTs = nowTs;
                    return;
                }
                if (deltaQuota < 1) { // 可腾出空间过小,最小单位是1
                    return;
                }
    
                this.leftQuota += deltaQuota;
                this.leakingTs = nowTs;
                //判断剩余空间是否超过总容量
                if (this.leftQuota > this.capacity) {
                    this.leftQuota = this.capacity;
                }
            }
    
            boolean watering(int quota) {
                makeSpace();
                if (this.leftQuota >= quota) {//判断剩余空间是否足够
                    this.leftQuota -= quota;
                    return true;
                }
                return false;
            }
        }
    }
    

      以上就是漏桶限流的java实现,至于令牌桶限流代码,Google的Guava中有该算法的实现(RateLimiter)使用还是非常简单的(意思就是自行Google)。
      说好的分布式呢。。。

    漏桶算法的分布式实现思路

      将Funnel 对象的内容按字段存储到一个 hash 结构中,进水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

      但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。

      如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?救星来了!我们可以使用Redis-Cell,可惜的是Redis-Cell是一个模块,而模块特效是Redis4.0中带来的。。。而公司的Redis还是可怜的2.X版本。。。

    令牌桶分布式实现

      我们公司使用的接口限流策略就是令牌桶限流。实现方式是Guava的RateLimit+zookeeper,zookeeper中存储了网关中心的服务数量,各个服务均分接口流量,并对网关中心最高QPS也在每个服务器上做均分。这样的好处是限流令牌在本地保存,不通过网络传输,每台服务器绝对的均分限流,当集群中有服务下线也不会影响其他服务器的限流阈值,不会因为一台服务的下线流量集中导致其他服务的限流阈值上升,继而出现雪崩。缺点就是不灵活,在高并发时负载不均衡的情况肯定会出现,但是因为每台服务器的限流阈值是一个定值,这就导致某些压力较大的服务不能灵活的根据整个集群的限流情况,调整限流阈值,只能拒绝服务。
      暴力贴代码又来了。。。

    
       /**
         * 限流校验
         *
         * @param key 限流缓存对象key
         * @return 校验通过返回true 反之返回false
         */
        private boolean checkRateLimit(String key, Integer limiterConcurrency, Integer limiterTimeOut) {
            boolean valid = true;
            try {
                long startTime = System.currentTimeMillis();
                int tmpCount = serverCount == 0 ? DEFAUL_SERVER_COUNT : serverCount;
                double permitsPerSecond = BigDecimal.valueOf(limiterConcurrency).divide(BigDecimal.valueOf(tmpCount), 2, RoundingMode.FLOOR).doubleValue();
                RateLimiter rateLimiter = getRateLimiter(key, permitsPerSecond);
                if (limiterTimeOut == null || limiterTimeOut == 0) {
                    valid = rateLimiter.tryAcquire();
                } else {
                    valid = rateLimiter.tryAcquire(limiterTimeOut, TimeUnit.MILLISECONDS);
                }
                long endTime = System.currentTimeMillis();
                LOGGER.info(String.format("api接口限流校验,key=%s,valid=%s,serverCount=%s,耗时=%s", key, valid, tmpCount, endTime - startTime));
            } catch (Throwable e) {
                LOGGER.error(String.format("api接口限流校验异常,key=%s", key), e);
            }
            return valid;
        }
    
        /**
         * 获取限流对象
         *
         * @param key              限流对象缓存KEY
         * @param permitsPerSecond
         * @return
         */
        private RateLimiter getRateLimiter(String key, double permitsPerSecond) {
            RateLimiter rateLimiter = rateLimiterMap.get(key);
            if (rateLimiter == null) {
                synchronized (this) {
                    rateLimiter = rateLimiterMap.get(key);
                    if (rateLimiter == null) {
                        rateLimiter = RateLimiter.create(permitsPerSecond);
                        rateLimiterMap.put(key, rateLimiter);
                        return rateLimiter;
                    }
                }
            }
            double diff = permitsPerSecond - rateLimiter.getRate();
            if (diff >= 0.01 || diff < 0) {
                LOGGER.info(String.format("api接口限流-并发量更新,key=%s,permitsPerSecond=%s", key, permitsPerSecond));
                rateLimiter.setRate(permitsPerSecond);
            }
            return rateLimiter;
        }
    

    又要快乐的结束了.....

      写到这里时,分布式限流终于要结束了,开开心心的看了看(>﹏<)左耳朵耗子的限流设计博文,嗯,我真是渣.....

    相关文章

      网友评论

        本文标题:分布式限流大作战

        本文链接:https://www.haomeiwen.com/subject/xilchqtx.html