美文网首页
一次线上环境应用使用jedis客户端分片功能实践

一次线上环境应用使用jedis客户端分片功能实践

作者: 星城天空 | 来源:发表于2021-08-08 20:39 被阅读0次

    背景

    线上有一台应用使用单机redis作为缓存,在一次架构巡检时发现此单机使用redis在业务高峰期,接收请求的qps达到6+w/s,考虑到后期业务的增长,应用的稳定性,需要调整架构设计,解决此性能问题。

    方案

    该应用业务特点为,业务高峰期并发请求高,缓存内存占用量较低(大约为5G)。目前应用部署在云平台上,若申请redis集群最小容量为256G,存在严重资源浪费。决定新增一台redis,在jedis客户端使用ShardedJedisPool根据key做hash分片路由到两台单机redis上,降低单机redis的qps的方案。

    具体实现

    1.加入maven依赖

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.10.2</version>
    </dependency>
    

    2.代码实现

    public interface ShardedJedisAction<T> {
    
        T doAction(ShardedJedis paramShardedJedis);
    
    }
    
    public interface ShardedJedisClient {
    
        <T> T execute(ShardedJedisAction<T> action);
    
    }
    
    public class ShardedJedisClientImpl implements ShardedJedisClient{
    
        private static final Logger logger = LoggerFactory.getLogger(ShardedJedisClientImpl.class);
    
        private static final ScheduledExecutorService shardedJedisPoolMonitorExecutor =
                Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
                        .setNameFormat("ShardedJedisPoolMonitor-%d").setDaemon(false).build());
    
        private ShardedJedisPool pool;
    
        public void init(){
            shardedJedisPoolMonitorExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    logger.info("ShardedJedisPoolMonitor enable,NumActive:{},NumIdle:{},NumWaiters:{}",pool.getNumActive(),pool.getNumIdle(),pool.getNumWaiters());
                }
            },60,15, TimeUnit.SECONDS);
        }
    
        @Override
        public <T> T execute(ShardedJedisAction<T> action) {
            T result = null;
            try {
                result = executeAction(action);
            } catch (Exception e) {
                logger.error("ShardedJedisClient execute error:{}",e);
            }
            return result;
        }
    
        private <T> T executeAction(ShardedJedisAction<T> action) {
            ShardedJedis shardedJedis = null;
            try {
                shardedJedis = pool.getResource();
                Object object = action.doAction(shardedJedis);
                return (T)object;
            } catch (JedisConnectionException jex) {
                if (shardedJedis != null) {
                    try {
                        pool.returnBrokenResource(shardedJedis);
                    } catch (Exception ex) {
                        logger.warn("Can not return broken resource.", ex);
                    }
                    shardedJedis = null;
                }
                throw jex;
            }
            finally{
                if (shardedJedis != null) {
                    try {
                        pool.returnResource(shardedJedis);
                    } catch (Exception ex) {
                        logger.warn("Can not return resource.", ex);
                    }
                }
            }
        }
    
        public ShardedJedisPool getPool() {
            return pool;
        }
    
        public void setPool(ShardedJedisPool pool) {
            this.pool = pool;
        }
    
        public void destroy() {
            shardedJedisPoolMonitorExecutor.shutdown();
            pool.destroy();
        }
    }
    
    public class ShardedJedisTemplate {
    
        private static final Logger logger = LoggerFactory.getLogger(ShardedJedisTemplate.class);
    
        private ShardedJedisClient shardedClient;
    
        private RedisSerializer keySerializer;
    
        private RedisSerializer valueSerializer;
    
        /**
         * 设置单个值
         * @param key
         * @param value
         * @return
         */
        public void set(final String key,final Object value){
                final byte[] rawKey = rawKey(key);
                final byte[] rawValue = rawValue(value);
                shardedClient.execute(new ShardedJedisAction<String>() {
                    @Override
                    public String doAction(ShardedJedis shardedJedis) {
                        logger.debug("key:{},value:{},action:set,redis ShardInfo host:{},port{}", key, JSON.toJSONString(value), shardedJedis.getShardInfo(rawKey).getHost(), shardedJedis.getShardInfo(rawKey).getPort());
                        return shardedJedis.set(rawKey, rawValue);
                    }
                });
        }
    
        /**
         * 获取单个值
         *
         * @param key
         * @return
         */
        public <T> T get(String key) {
                final byte[] rawKey = rawKey(key);
                byte[] result = shardedClient.execute(new ShardedJedisAction<byte[]>() {
                    @Override
                    public byte[] doAction(ShardedJedis shardedJedis) {
                        byte[] rawValue = shardedJedis.get(rawKey);
                        logger.debug("key:{},value:{},action:get,redis ShardInfo host:{},port{}",key, JSON.toJSONString(rawValue),shardedJedis.getShardInfo(rawKey).getHost(),shardedJedis.getShardInfo(rawKey).getPort());
                        return rawValue;
                    }
                });
                return (T) deserializeValue(result);
        }
    
        /**
         * key转字节
         *
         * @param key
         * @return
         */
        private byte[] rawKey(String key) {
            Assert.notNull(key, "non null key required");
            if (keySerializer == null) {
                return key.getBytes();
            }
            return keySerializer.serialize(key);
        }
    
        /**
         * value转字节
         *
         * @param value
         * @return
         */
        private byte[] rawValue(Object value) {
            if (valueSerializer == null && value instanceof byte[]) {
                return (byte[]) value;
            }
            return valueSerializer.serialize(value);
        }
    
        private Object deserializeValue(byte[] value) {
            if (valueSerializer == null) {
                return value;
            }
            return valueSerializer.deserialize(value);
        }
    
        public ShardedJedisClient getShardedClient() {
            return shardedClient;
        }
    
        public void setShardedClient(ShardedJedisClient shardedClient) {
            this.shardedClient = shardedClient;
        }
    
        public RedisSerializer getKeySerializer() {
            return keySerializer;
        }
    
        public void setKeySerializer(RedisSerializer keySerializer) {
            this.keySerializer = keySerializer;
        }
    
        public RedisSerializer getValueSerializer() {
            return valueSerializer;
        }
    
        public void setValueSerializer(RedisSerializer valueSerializer) {
            this.valueSerializer = valueSerializer;
        }
    }
    
    @Slf4j
    @Configuration
    public class ShardedJedisConfiguration {
    
        @Value("#{'${redis.uri.list}'.split(',')}")
        private List<String> uriList ;
    
        @Value("{redis.pool.minIdle")
        private int minIdle;
    
        @Value("${redis.pool.maxIdle}")
        private  int maxIdle;
    
        @Value("${redis.pool.maxTotal}")
        private  int maxTotal;
    
        @Value("${redis.pool.maxWaitMillis}")
        private  int maxWaitMillis;
    
        @Value("${redis.pool.testOnBorrow}")
        private Boolean testOnBorrow;
    
        @Value("${redis.pool.timeBetweenEvictionRunsMillis}")
        private int timeBetweenEvictionRunsMillis;
    
        @Value("${redis.pool.minEvictableIdleTimeMillis}")
        private int minEvictableIdleTimeMillis;
    
        @Bean
        public JedisPoolConfig jedisPoolConfig() {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMinIdle(minIdle);
            jedisPoolConfig.setMaxIdle(maxIdle);
            jedisPoolConfig.setMaxTotal(maxTotal);
            jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
            jedisPoolConfig.setTestOnBorrow(testOnBorrow);
            jedisPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
            jedisPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
            return jedisPoolConfig;
        }
    
        @Bean
        public ShardedJedisPool shardedJedisPool(@Qualifier("jedisPoolConfig") JedisPoolConfig jedisPoolConfig){
            List<JedisShardInfo> jedisShardInfos = uriList.stream().map(uri->new JedisShardInfo(uri)).collect(Collectors.toList());
            return new ShardedJedisPool(jedisPoolConfig, jedisShardInfos);
        }
    
    
        @Bean(name = "shardedJedisClient")
        public ShardedJedisClient shardedJedisClient(@Qualifier("shardedJedisPool") ShardedJedisPool shardedJedisPool){
            ShardedJedisClientImpl shardedJedisClient = new ShardedJedisClientImpl();
            shardedJedisClient.setPool(shardedJedisPool);
            return shardedJedisClient;
        }
    
        @Bean(name = "shardedRedisTemplate")
        public ShardedJedisTemplate shardedRedisTemplate(@Qualifier("shardedJedisClient") ShardedJedisClient shardedJedisClient) {
            ShardedJedisTemplate shardedJedisTemplate = new ShardedJedisTemplate();
            shardedJedisTemplate.setShardedClient(shardedJedisClient);
            shardedJedisTemplate.setKeySerializer(new StringRedisSerializer());
            shardedJedisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
            return shardedJedisTemplate;
        }
    
    }
    

    踩坑经历

    jedis的2.10.2和3.x版本在释放jedis连接池的写法不一样。若maven依赖jedis版本2.10.2,但代码使用官方3.x的写法,在高并发的请求下会出现连接池不能释放的问题。具体可以参考官方demo:https://github.com/redis/jedis/

    实践结果

    目前应用已稳定运行半年

    相关文章

      网友评论

          本文标题:一次线上环境应用使用jedis客户端分片功能实践

          本文链接:https://www.haomeiwen.com/subject/exnovltx.html