美文网首页devops
(一)redis限流

(一)redis限流

作者: 无聊之园 | 来源:发表于2019-06-24 16:00 被阅读0次

    来自于《Redis深度历险:核心原理和应用实践》,进行分析。

    业务场景:一个id,限制每秒只能访问多少次。

    下面的代码使用了redission工具类。

    不合理的方式:
    1、以下这种方式,没有考虑并发问题,一瞬间所有的请求,同时get,然后还没来得及increment,然后就判断错误了。
    2、改善方式,先increment,再get。(不过这样的话,get会得到别人increment后的值,有些业务不合适)。或者,使用下面那样的自旋的方式。

    @Override
       public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
           String ip = request.getRemoteHost();
           String key = IP_PREFIX + ip;
           Object value = redisService.get(BLACK_PREFIX + ip);
           if(value != null){
               // 在黑名单则不让访问
               return false;
           }
           Object o = redisService.get(key);
           if(o == null){
               redisService.set(key, 1, 2000L);
           } else if(o.equals(50)){
               // 2秒超过50个请求就屏蔽掉,放入黑名单,1小时候之后不能访问
               redisService.set(BLACK_PREFIX + ip, 1, 3600L);
           }else{
               redisService.increment(key, 1);
           }
           return true;
       }
    

    redis自增,必须要使用StringRedisTemplate,而且自定义好key策略。

    @Bean
        public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
            StringRedisTemplate template = new StringRedisTemplate(factory);
            //定义key序列化方式
            //RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long类型会出现异常信息;需要我们上面的自定义key生成策略,一般没必要
            //定义value的序列化方式
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    
            // template.setKeySerializer(redisSerializer);
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    

    1、自旋的方式。

    这不是书中的方式。

    缺点:无法精确的限制,每秒10次,最大的可能会到18次。
    优点:内存利用少。

     RedissonClient redisson = Redisson.create();
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    
            String id = "123";
            RBucket<Boolean> bucket = redisson.getBucket("blackId:" + id);
            // 是否是黑名单得
            if(bucket.get() == true){
                // 或者返回最近的一个请求的缓存
                return;
            }
            RAtomicLong idAccessNum = redisson.getAtomicLong(id);
            while (true){
                long num = idAccessNum.get();
                // 没有初始化,则设置过期时间
                // 这种方式,并非准确10秒10次就到了阈值,可能前10秒已经有9次,然后后10秒又请求了9次,
                // 最长可能会有18次。但是我们不需要准确的10次,所以无影响。
                if(num == 0){
                    idAccessNum.expire(10, TimeUnit.SECONDS);
                }
                // 超过了10次,则进入黑名单
                if(num > 10){
                    // 加入黑名单,30秒之后不能再访问
                    bucket.set(true, 30, TimeUnit.SECONDS);
                    // 或者返回最近的一个请求的缓存
                    return;
                }
                // 这里自旋,只针对同一个用户id,因为理论上同一个用户的请求是线性的,所以这里自旋
                // 不会对性能造成很大的影响
                boolean updateSuccess = idAccessNum.compareAndSet(num, num + 1);
                // 是否更新成功
                if(updateSuccess){
                    break;
                }
            }
    

    二、使用zset

    这是书中的方式。
    优点:能够精确的统计次数。
    缺点:比较浪费内存。每请求一次,就ad一个数。

    public class SimpleRateLimiter {
        private Jedis jedis;
    
        public SimpleRateLimiter(Jedis jedis) {
            this.jedis = jedis;
        }
    
        public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
            String key = String.format("hist:%s:%s", userId, actionKey);
            long nowTs = System.currentTimeMillis();
            Pipeline pipe = jedis.pipelined();
            pipe.multi();
            // value 没有实际的意义,保证唯一就可以
            pipe.zadd(key, nowTs, "" + nowTs);
            pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
            Response<Long> count = pipe.zcard(key);
            pipe.expire(key, period + 1);
            pipe.exec();
            pipe.close();
            return count.get() <= maxCount;
        }
    
        public static void main(String[] args) {
            Jedis jedis = new Jedis();
            SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
            for (int i = 0; i < 20; i++) {
                System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
            }
        }
    }
    

    如果使用redission。
    则代码大概这么写

    public static void main(String[] args) {
            RedissonClient redisson = Redisson.create();
    
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    
            String id = "123";
            RBucket<Boolean> bucket = redisson.getBucket("blackId:" + id);
            // 是否是黑名单得
            if(bucket.get() == true){
                // 或者返回最近的一个请求的缓存
                return;
            }
    
            long nanoTime = System.nanoTime();
            RScoredSortedSet<Object> zset = redisson.getScoredSortedSet(id);
            zset.expire(10, TimeUnit.SECONDS);
            zset.add(nanoTime, nanoTime);
            zset.removeRangeByScore(0, true,
                    nanoTime - 10 * 1000 * 1000 * 1000, true);
            int size = zset.size();
            // 超过了10次,则进入黑名单
            if(size > 10){
                // 加入黑名单,30秒之后不能再访问
                bucket.set(true, 30, TimeUnit.SECONDS);
                // 或者返回最近的一个请求的缓存
                return;
            }
            // 放行
        }
    

    如果没有redis,只是单机,用java可以大概这么写:

    public class TestLimitFlow {
    
       private Lock lock = new ReentrantLock();
    
       private Map<String, Entity> map = new ConcurrentHashMap<>();
    
       public TestLimitFlow() {
           new Thread(() -> {
               while (true) {
                   System.out.println(map.size());
                   map.forEach((key, value) -> {
                       long now = System.nanoTime();
                       if (value.expireTime < now) {
                           map.remove(key);
                       }
                   });
                   try {
                       TimeUnit.SECONDS.sleep(1);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
    
           }).start();
       }
    
    
       class Entity<T> {
           public long expireTime;
           T o;
       }
    
       public void interceptor(String ip) throws InterruptedException {
           lock.lock();
           Entity e = map.get(ip);
           long nanoTime = System.nanoTime();
           if (e == null) {
               TreeSet<Long> treeSet = new TreeSet<>();
               e = new Entity<TreeSet<Long>>();
               e.expireTime = nanoTime + 10L * 1000 * 1000 * 1000;
               e.o = treeSet;
               map.put(ip, e);
           }
           lock.unlock();
           synchronized (e) {
               e.expireTime = nanoTime + 10L * 1000 * 1000 * 1000;
               TreeSet<Long> zset = (TreeSet<Long>) e.o;
               zset.add(nanoTime);
               List<Long> deleteKey = Lists.newArrayList();
               for(Long item : zset){
                   if(item < nanoTime - 10L * 1000 * 1000 * 1000){
                       System.out.println("true");
    //                    zset.remove(item);
                       deleteKey.add(item);
                   }else {
                       break;
                   }
               }
               deleteKey.forEach(item -> {
                   zset.remove(item);
               });
               System.out.println(zset.size());
               if (zset.size() > 10) {
                   // 加入黑名单
                   System.out.println("黑名单:" + ip + ":" + zset.size());
               }
           }
    
       }
    
       public static void main(String[] args) throws InterruptedException {
           TestLimitFlow testXL = new TestLimitFlow();
           for(int j = 0; j < 10;j++){
               int finalJ = j;
               new Thread(){
                   @Override
                   public void run() {
                       for (int i = 0; i < 12; i++) {
    //            TimeUnit.SECONDS.sleep(1);
                           try {
                               testXL.interceptor("localhost" + finalJ);
                           } catch (InterruptedException e) {
                               e.printStackTrace();
                           }
                       }
                   }
               }.start();
           }
    
    
       }
    }
    

    相关文章

      网友评论

        本文标题:(一)redis限流

        本文链接:https://www.haomeiwen.com/subject/pedoqctx.html