美文网首页面试精选Redis
[ Redis]基于Redis的zset结构实现限流

[ Redis]基于Redis的zset结构实现限流

作者: AbstractCulture | 来源:发表于2020-12-22 00:24 被阅读0次

    限流的场景

    平时项目开发中,限流的场景较为多见,这里举几个例子:

    • IO较大的请求,如:文件上传、文件下载。
    • SSO认证中心获取授权token,一般会分配有效期,如果对外开放,不适合多次请求。
    • 调用第三方接口,会有频次限制。如:阿里云、腾讯云接口等
    • 门户app的发帖、回复、点赞等行为,需要进行限流操作

    限流策略

    我们可以记录当前请求的次数,约定在指定的时间(period)内,最多允许发生的请求次数(maxRequestCount)。
    此时,我们对外暴露的接口约定可以按以下的方式:

    package com.xjm.spring.data.redis.core.limit.support;
    
    /**
     * @author jaymin<br>
     * 限流器<br>
     * 2020/12/21 23:30
     */
    public interface RateLimiter {
    
        /**
         * 本次请求是否在限流次数内
         * @param requestEvent 请求事件,作为Redis存储的key值
         * @param period 时间窗,即需要在多少时间范围内限制该行为
         * @param maxRequestCount 最大请求次数
         * @return
         */
        boolean isAllowed(String requestEvent,int period,int maxRequestCount);
    }
    

    如何限流

    使用Redis的zset结构可以帮助我们去实现一个简单的限流器。
    将请求事件作为key,当前的时间戳作为score,同时填充一个唯一值(可以用UUID,但是会耗费多一点性能,这里使用timestamp)作为value

    RateLimit

    可以看到,每次请求进来,都会往zset中增加一个记录。针对不同的事件,采用不同的key值。 然后使用redis的zremrangebyscore key minScore maxScore指令来对时间窗内的行为进行裁剪。然后通过zcard key来统计当前时间窗内发生的事件数量进而做出判断即可。

    使用pipeline来加快指令执行时间

    由于一次限流用到的指令较多,如果你熟悉lua脚本,那么可以针对这个进行lua脚本的编写,这里使用的是redis的管道进行指令加速。

    redisClient的技术选型

    对于连接Redis,可以使用Jedis,也可以使用Spring的RedisTemplate ,这里使用的是RedisTemplate。

    Code

    • yml配置
    spring:
      redis:
        lettuce:
          pool:
            max-wait: -1
            min-idle: 0
            max-idle: 200
            max-active: 100
          shutdown-timeout: 100
        host: 192.168.xx.xxx
        port: 6379
        password: xxx
        database: 0
        timeout: 3000
    
    • RedisTemplateConfig
    package com.xjm.spring.data.config;
    
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisPassword;
    import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import java.time.Duration;
    
    /**
     * com.xjm.redis.template.config
     *
     * @author xiejiemin
     * @create 2020/12/15
     */
    @Configuration
    public class RedisTemplateConfig {
    
        @Value("${spring.redis.database}")
        private int database;
    
        @Value("${spring.redis.host}")
        private String host;
    
        @Value("${spring.redis.password}")
        private String password;
    
        @Value("${spring.redis.port}")
        private int port;
    
        @Value("${spring.redis.timeout}")
        private long timeout;
    
        @Value("${spring.redis.lettuce.shutdown-timeout}")
        private long shutDownTimeout;
    
        @Value("${spring.redis.lettuce.pool.max-idle}")
        private int maxIdle;
    
        @Value("${spring.redis.lettuce.pool.min-idle}")
        private int minIdle;
    
        @Value("${spring.redis.lettuce.pool.max-active}")
        private int maxActive;
    
        @Value("${spring.redis.lettuce.pool.max-wait}")
        private long maxWait;
    
        @Bean
        public LettuceConnectionFactory lettuceConnectionFactory() {
            GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
            genericObjectPoolConfig.setMaxIdle(maxIdle);
            genericObjectPoolConfig.setMinIdle(minIdle);
            genericObjectPoolConfig.setMaxTotal(maxActive);
            genericObjectPoolConfig.setMaxWaitMillis(maxWait);
            genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(100);
            RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
            redisStandaloneConfiguration.setDatabase(database);
            redisStandaloneConfiguration.setHostName(host);
            redisStandaloneConfiguration.setPort(port);
            redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
            LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                    .commandTimeout(Duration.ofMillis(timeout))
                    .shutdownTimeout(Duration.ofMillis(shutDownTimeout))
                    .poolConfig(genericObjectPoolConfig)
                    .build();
    
            LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
            factory.setShareNativeConnection(true);
            return factory;
        }
    
        /**
         * 设置 redisTemplate 的序列化设置
         * @param redisConnectionFactory
         * @return
         */
        @Bean("myRedisTemplateConfig")
        public RedisTemplate<Object, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
            // 1.创建 redisTemplate 模版
            RedisTemplate<Object, Object> template = new RedisTemplate<>();
            // 2.关联 redisConnectionFactory
            template.setConnectionFactory(lettuceConnectionFactory);
            // 3.创建 序列化类
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            // 4.设置可见度
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            // 5.启动默认的类型
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            // 6.序列化类,对象映射设置
            jackson2JsonRedisSerializer.setObjectMapper(om);
            // 7.设置 value 的转化格式和 key 的转化格式
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.setKeySerializer(new StringRedisSerializer());
            template.afterPropertiesSet();
            template.setEnableTransactionSupport(false);
            return template;
        }
    
    }
    
    • RateLimiterLoader
    package com.xjm.spring.data.redis.core.limit.support;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.time.Instant;
    import java.util.List;
    
    /**
     * @author jaymin
     * 2020/12/20 20:28
     */
    @Component
    @Slf4j
    public class RateLimiterLoader implements RateLimiter {
    
        @Resource(name = "myRedisTemplateConfig")
        private RedisTemplate redisTemplate;
    
        /**
         * 本次请求是否在限流次数内
         *
         * @param requestEvent    请求事件,作为Redis存储的key值
         * @param period          时间窗,即需要在多少时间范围内限制该行为
         * @param maxRequestCount 最大请求次数
         * @return
         */
        @Override
        public boolean isAllowed(String requestEvent, int period, int maxRequestCount) {
            if (StringUtils.isBlank(requestEvent)) {
                throw new RuntimeException("Expect the input parameter to exist, the actual value is empty");
            }
            // 1. 获取当前的时间戳
            long now = Instant.now().toEpochMilli();
            log.info("current timestamp :{}", now);
            RedisSerializer stringSerializer = redisTemplate.getStringSerializer();
            byte[] redisKey = stringSerializer.serialize(requestEvent);
            // 2. 建立管道
            List<Object> list = redisTemplate.executePipelined((RedisCallback) redisConnection -> {
                byte[] value = stringSerializer.serialize(String.valueOf(now));
                // 3. 将当前的操作先存储下来
                redisConnection.zAdd(redisKey, now, value);
                double maxScope = now - period * 1000;
                log.info("max scope:{}", maxScope);
                // 4. 移除时间窗之外的数据
                redisConnection.zRemRangeByScore(redisKey, 0, maxScope);
                // 5. 统计剩下的key
                redisConnection.zCard(redisKey);
                // 6. 将当前key设置过期时间,过期时间为时间窗
                redisConnection.expire(redisKey, period + 1);
                return null;
            });
            Long currentRequestCount = (Long) list.get(2);
            // 8. 比较时间窗内的操作数
            log.info("current request count:{}", currentRequestCount);
            return currentRequestCount <= maxRequestCount;
        }
    }
    
    • TestController
    package com.xjm.modules;
    
    import com.xjm.spring.data.redis.core.limit.support.RateLimiterLoader;
    import com.xjm.thread.ThreadPoolUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author jaymin
     * 2020/12/21 1:15
     */
    @RestController
    @Slf4j
    @RequestMapping(value = "/redis")
    public class TestController {
        @Autowired
        private RateLimiterLoader rateLimiter;
    
        private static volatile int allowedCount = 0;
    
        @GetMapping("/test")
        public String testRateLimiter(){
            String key = "jaymin:limit:test";
            for (int i = 0; i < 200; i++) {
                if (rateLimiter.isAllowed(key,60,10)){
                    allowedCount++;
                }
            }
            return String.format("The allowed count is %s", allowedCount);
        }
    }
    
    • Result
    result

    总结

    • 整个限流方案中,scope是最关键的,使用时间戳进行窗口滑动,同时注意保持value值唯一
    • 高并发场景下,存在内存消耗的问题。因为需要记录每次的行为,所以不建议用在大并发(60S内请求次数100W)的场景下。
    • pipeline是非原子的,如果有严格要求,可以采用lua脚本。

    相关文章

      网友评论

        本文标题:[ Redis]基于Redis的zset结构实现限流

        本文链接:https://www.haomeiwen.com/subject/acdjnktx.html