美文网首页
扩展Jedis功能实现对Redis集群Pipeline

扩展Jedis功能实现对Redis集群Pipeline

作者: 上岸大虾米 | 来源:发表于2021-09-03 10:08 被阅读0次

    在使用Jedis操作Redis集群中发现Jedis不支持集群Pipeline操作,尝试扩展该功能,目前仅通过了初步的验证,这里仅仅是提供一个思路、参考。

    实现集群Pipeline步骤

    1. 计算key所对应的slot,获取对应的jedis对象
    2. 将所有的key按照jedis分组
    3. 每个分组分别执行对应的Pipeline
    4. 汇总结果集

    同时可以使用这个思路,解决JedisCluster 不支持的mget,del多key等类似问题。

    定义操作接口

    import redis.clients.jedis.RedisPipeline;
    
    import java.util.List;
    
    /**
     * @author Adimin
     */
    public interface JedisPipeline extends RedisPipeline {
    
      /**
       * @see redis.clients.jedis.Pipeline#syncAndReturnAll()
       * @return 操作结果
       */
      List<Object> syncAndReturnAll();
    
    }
    
    

    实现类

    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import org.apache.ibatis.reflection.MetaObject;
    import org.apache.ibatis.session.Configuration;
    import redis.clients.jedis.*;
    import redis.clients.util.JedisClusterCRC16;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 实现集群Pipeline步骤
     * 1、计算key所对应的slot,获取对应的jedis对象
     * 2、将所有的key按照jedis分组
     * 3、每个分组分别执行对应的Pipeline
     * 4、汇总结果集
     *
     * @author Adimin
     */
    public class ClusterPipelineUtil extends Pipeline implements JedisPipeline {
    
        private int size = 0;
    
        /**
         * redis -> pipeline 映射关系
         * 集群中是由多个redis组成,每个redis节点维护一个pipeline
         */
        private Map<Jedis, Pipeline> redisAndPipelineMap = Maps.newHashMap();
    
        /**
         * pipeline 与 key所在的位置
         */
        private Map<Pipeline, ArrayList<Integer>> resultMap = Maps.newHashMap();
    
        private Map<JedisPool, Jedis> poolAndJedisMap = Maps.newHashMap();
    
        /**
         * TODO 未考虑redis集群槽重新分配场景
         * 槽与JedisPool映射关系
         */
        private Map<Integer, JedisPool> slots;
    
        private ClusterPipelineUtil(JedisCluster jedisCluster) {
            // 构建mybatis提供的工具
            Configuration configuration = new Configuration();
            MetaObject metaObject = configuration.newMetaObject(jedisCluster);
            slots = (Map<Integer, JedisPool>) metaObject.getValue("connectionHandler.cache.slots");
        }
    
        private Pipeline before(String key) {
            // 计算key所在槽的编号
            int slot = JedisClusterCRC16.getSlot(key);
            JedisPool jedisPool = slots.get(slot);
            // 1
            Jedis jedis = poolAndJedisMap.computeIfAbsent(jedisPool, JedisPool::getResource);
            // 2
            Pipeline pipeline = redisAndPipelineMap.computeIfAbsent(jedis, BinaryJedis::pipelined);
            ArrayList<Integer> keyOrderList = resultMap.computeIfAbsent(pipeline, (k) -> Lists.newArrayList());
            keyOrderList.add(size++);
            return pipeline;
        }
    
        private void clearNew() {
            size = 0;
            resultMap.clear();
            poolAndJedisMap.clear();
            // 将资源返还至JedisPool池中
            redisAndPipelineMap.keySet().forEach(Jedis::close);
            redisAndPipelineMap.clear();
        }
    
        @Override
        public List<Object> syncAndReturnAll() {
            Object[] arr = new Object[size];
            try {
                redisAndPipelineMap.values().forEach(pipeline -> {
                    // 3
                    List<Object> list = pipeline.syncAndReturnAll();
                    ArrayList<Integer> integers = resultMap.get(pipeline);
                    for (int i = 0; i < integers.size(); i++) {
                        // 4
                        arr[integers.get(i)] = list.get(i);
                    }
                });
            }finally {
                clearNew();
            }
            return Lists.newArrayList(arr);
        }
    
        public static JedisPipeline newInstance(JedisCluster jedisCluster) {
            ClusterPipelineInvocationHandler handler = new ClusterPipelineInvocationHandler();
            return handler.getProxy(jedisCluster);
        }
    
        public static class ClusterPipelineInvocationHandler implements InvocationHandler {
    
            private ClusterPipelineUtil clusterPipeline;
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (args == null || args.length == 0) {
                    return method.invoke(clusterPipeline, args);
                }
                String key = (String) args[0];
                Pipeline before = clusterPipeline.before(key);
                return method.invoke(before, args);
            }
    
            JedisPipeline getProxy(JedisCluster jedisCluster) {
                clusterPipeline = new ClusterPipelineUtil(jedisCluster);
                return (JedisPipeline) Proxy.newProxyInstance(clusterPipeline.getClass().getClassLoader(), clusterPipeline.getClass().getInterfaces(), this);
            }
        }
    
    }
    
    
    

    测试类

         // 省略jedisCluster构建
    
        @Before
        public void before(){
            jedisCluster.del("a");
            jedisCluster.del("b");
            jedisCluster.del("c");
            jedisCluster.del("d");
    
            jedisCluster.sadd("a","1","2","3");
            jedisCluster.sadd("b","4");
    
            jedisCluster.set("c","5");
            jedisCluster.set("d","6");
        }
    
        @Test
        public void test()  {
            JedisPipeline pipelined = ClusterPipelineUtil.newInstance(jedisCluster);
    
            pipelined.sismember("a", "1");
            pipelined.sismember("a", "2");
            pipelined.sismember("a", "3");
            pipelined.sismember("a", "4");
    
            pipelined.sismember("b", "4");
            pipelined.sismember("b", "5");
    
            pipelined.get("c");
            pipelined.get("d");
    
            List<Object> resultList = pipelined.syncAndReturnAll();
            resultList.forEach(System.out::println);
        }
    

    相关文章

      网友评论

          本文标题:扩展Jedis功能实现对Redis集群Pipeline

          本文链接:https://www.haomeiwen.com/subject/bknhwltx.html