美文网首页
JedisCluster 操作管道(基于jedis 2.9+的

JedisCluster 操作管道(基于jedis 2.9+的

作者: 失足的骏马 | 来源:发表于2020-04-22 15:52 被阅读0次

    前言

    现在很多的博客论坛,很多都是以前写的代码。殊不知,这代码不是一层不变的。特别是涉及到源码的改变。这就导致很多网上的文章几乎都是 copy 来 copy 去的。这里也只是建议大家的有看源码的习惯。不然,照抄网上的博客有时候真的不能解决问题。还得动动脑子。本人也是踩坑过来的 。好了。回到重点,

    为什么  JedisCluster 不支持直接操作管道(Pipeline)?  (如果面试这么问。你怎么回答?百思不得其姐(解)  欢迎留言^_^)

    首先我们看下 JedisCluster  源码。

    public class JedisCluster extends BinaryJedisCluster implements JedisCommands,

        MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {

    /*****************一堆方法*****************/

      }

    看到这个 JedisCluster 类 继承 BinaryJedisCluster。 好了我们在下一步看   BinaryJedisCluster 类里面到底是什么?

    public class BinaryJedisCluster implements BasicCommands, BinaryJedisClusterCommands,

        MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {

      public static final short HASHSLOTS = 16384;

      protected static final int DEFAULT_TIMEOUT = 2000;

      protected static final int DEFAULT_MAX_REDIRECTIONS = 5;

      protected int maxAttempts;

      protected JedisClusterConnectionHandler connectionHandler;

    /****************************一堆方法*****************************/

    }

    可以看到在 BinaryJedisCluster  继承一些接口。所以我我们先看下这个类下除了构造方法还剩下什么东东?

    埃!!! JedisClusterConnectionHandler connectionHandler; 这个类里面会不会有我们想要的东西呢?进去看下

    package redis.clients.jedis;

    import java.io.Closeable;

    import java.util.Map;

    import java.util.Set;

    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

    import redis.clients.jedis.exceptions.JedisConnectionException;

    public abstract class JedisClusterConnectionHandler implements Closeable {

        protected final JedisClusterInfoCache cache;

      public JedisClusterConnectionHandler(Set<HostAndPort> nodes,

                                          final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {

        this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);

        initializeSlotsCache(nodes, poolConfig, password);

      }

      abstract Jedis getConnection();

      abstract Jedis getConnectionFromSlot(int slot);

      public Jedis getConnectionFromNode(HostAndPort node) {

        return cache.setupNodeIfNotExist(node).getResource();

      }

      public Map<String, JedisPool> getNodes() {

        return cache.getNodes();

      }

      private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {

        for (HostAndPort hostAndPort : startNodes) {

          Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());

          if (password != null) {

            jedis.auth(password);

          }

          try {

            cache.discoverClusterNodesAndSlots(jedis);

            break;

          } catch (JedisConnectionException e) {

            // try next nodes

          } finally {

            if (jedis != null) {

              jedis.close();

            }

          }

        }

      }

      public void renewSlotCache() {

        cache.renewClusterSlots(null);

      }

      public void renewSlotCache(Jedis jedis) {

        cache.renewClusterSlots(jedis);

      }

      @Override

      public void close() {

        cache.reset();

      }

    }

    这是个抽象类。里面有2个抽象方法。在2.9以前版本 

      abstract Jedis getConnection();

      abstract Jedis getConnectionFromSlot(int slot);

    这2方法可有所实现。(没去看2.9以前的版本源码)

    网上很多以前博客的都是使用  getConnectionFromSlot(int slot); 来获取某个 jedis  来操作  pipeline。

    所以在此我们还能怎么办呢?

    这会儿。我们看到

    JedisClusterInfoCache cache;       这里面会不会有有我们想要的?

    public class JedisClusterInfoCache {

      private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();

      private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

      private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

      private final Lock r = rwl.readLock();

      private final Lock w = rwl.writeLock();

      private volatile boolean rediscovering;

      private final GenericObjectPoolConfig poolConfig;

      private int connectionTimeout;

      private int soTimeout;

      private String password;

      private static final int MASTER_NODE_INDEX = 2;

      public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {

        this(poolConfig, timeout, timeout, null);

      }

      public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,

          final int connectionTimeout, final int soTimeout, final String password) {

        this.poolConfig = poolConfig;

        this.connectionTimeout = connectionTimeout;

        this.soTimeout = soTimeout;

        this.password = password;

      }

    /******方法******以下方法是我重点标注的*************/

    public JedisPool getNode(String nodeKey) {

        r.lock();

        try {

          return nodes.get(nodeKey);

        } finally {

          r.unlock();

        }

      }

      public JedisPool getSlotPool(int slot) {

        r.lock();

        try {

          return slots.get(slot);

        } finally {

          r.unlock();

        }

      }

      public Map<String, JedisPool> getNodes() {

        r.lock();

        try {

          return new HashMap<String, JedisPool>(nodes);

        } finally {

          r.unlock();

        }

      }

      public List<JedisPool> getShuffledNodesPool() {

        r.lock();

        try {

          List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());

          Collections.shuffle(pools);

          return pools;

        } finally {

          r.unlock();

        }

      }

    埃~看到了可以获取到某个jedis  

    其实这个 JedisClusterInfoCache 类 是你在初始化jedisCluster时 将所有的节点放入缓存。

    因此,这个类的方法能给我们返回相关的jedis实例

    我们要这么做呢?

    接下来是我的代码。通过java的反射机制直接获取。

    public static void main(String[] args) throws NoSuchFieldException {

            JedisPoolConfig config = new JedisPoolConfig();

            Set<HostAndPort> nodeList = new HashSet<>();

            nodeList.add(new HostAndPort("192.168.41.65", 6379));

            nodeList.add(new HostAndPort("192.168.41.70", 6379));

            nodeList.add(new HostAndPort("192.168.41.42", 6379));

            nodeList.add(new HostAndPort("192.168.41.20", 6380));

            nodeList.add(new HostAndPort("192.168.41.30", 6380));

            nodeList.add(new HostAndPort("192.168.41.40", 6380));

            JedisCluster jedisCluster = new JedisCluster(nodeList, 3000, config);

            jedisCluster.set("James", "Bond");

            //通過 java.lang.reflect.Field 反射

            Jedis jedis = getJedisFieldBySlot(jedisCluster, 0, "James");

            //通過spring 工具類  ReflectionUtils 反射

            Jedis j = getJedisBySlot(jedisCluster, 0, "James");

    // 接下来就是pipeline操作了

    if(jedis != null) {

    Pipeline pipeline = jedis.pipelined();

    pipeline.syncAndReturnAll();

    // jedis会自动将资源归还到连接池

    jedis.close();

    }else {

    System.err.println("找不到 jedis");

    }

        }

    /**

    * 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例

    * @param jedisCluster

    * @param slot

    * @param key

    * @return Jedis

    */

    public static Jedis getJedisFieldBySlot(JedisCluster jedisCluster,int slot,String key) {

    try {

    if(key !=null) {

    // 获取key对应的slot 获取槽号(0~16383)

    slot = JedisClusterCRC16.getSlot(key);

    }

    Field field = BinaryJedisCluster.class.getDeclaredField("connectionHandler");

    field.setAccessible(true);

    JedisClusterConnectionHandler connectionHandler =  (JedisClusterConnectionHandler) field.get(jedisCluster);

    Field jedisclusterinfocache =  JedisClusterConnectionHandler.class.getDeclaredField("cache");

    jedisclusterinfocache.setAccessible(true);

    JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

    JedisPool pool = cache.getSlotPool(slot);

    Jedis jedis = pool.getResource();

    return jedis;

    } catch (Exception e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    return null;

    }

    /**

    * 集裙中根據 key对应的slot 获取槽位 或 key 返回對應的某個Jedis 實例

    * @param jedisCluster

    * @param slot

    * @param key

    * @return Jedis

    */

    public static Jedis getJedisBySlot(JedisCluster jedisCluster,int slot,String key) {

    try {

    if(key !=null) {

    slot = JedisClusterCRC16.getSlot(key);

    }

    //org.springframework.util.ReflectionUtils 工具類  BinaryJedisCluster 下的  JedisClusterConnectionHandler

    Field field = ReflectionUtils.findField(BinaryJedisCluster.class, null, JedisClusterConnectionHandler.class);

    field.setAccessible(true);

    JedisClusterConnectionHandler connectionHandler =  (JedisClusterConnectionHandler) field.get(jedisCluster);

    Field jedisclusterinfocache = ReflectionUtils.findField(JedisClusterConnectionHandler.class, null, JedisClusterInfoCache.class);

    jedisclusterinfocache.setAccessible(true);

    JedisClusterInfoCache cache = (JedisClusterInfoCache) jedisclusterinfocache.get(connectionHandler);

    JedisPool pool = cache.getSlotPool(slot);

    Jedis jedis = pool.getResource();

    return jedis;

    } catch (Exception e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    return null;

    }

    但是大家以为以上的代码就很完美了么?

    确实。在去找缓存里的jedis时,可能某个节点挂了,然后刚好程序拿到这个实例,

    这时候这里就会出现错误。因此我们 应该在原来的基础上,去刷新一遍集群。

    代码由你们来给吧。我不会写了。哈哈哈·

    最后 如果针对  JedisClusterInfoCache 源码分析的 请看   https://www.cnblogs.com/zhengzuozhanglina/p/11383035.html

    相关文章

      网友评论

          本文标题:JedisCluster 操作管道(基于jedis 2.9+的

          本文链接:https://www.haomeiwen.com/subject/bvlmyftx.html