美文网首页
Springboot整合哨兵模式Redis集群验证

Springboot整合哨兵模式Redis集群验证

作者: 柠檬冰块 | 来源:发表于2020-09-11 08:44 被阅读0次

    按照上步搭建好的redis哨兵集群配合Jedis客户端进行Springboot整合

    1、配置

    # redis 主从哨兵配置

    spring:

        redis:

          database: 0

          password:

          pool:

            max-active: 8

            max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)

            max-idle: 8

            min-idle: 0

          timeout: 10000

          # 主节点哨兵名

          sentinel:

            master: mymaster

            # comma-separated list of host:port pairs  哨兵节点配置列表

            nodes: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381

    2、新建redis 配置类

    package com.example.demo.redis;

    import com.fasterxml.jackson.annotation.JsonAutoDetect;

    import com.fasterxml.jackson.annotation.PropertyAccessor;

    import com.fasterxml.jackson.databind.ObjectMapper;

    import org.springframework.cache.annotation.EnableCaching;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import org.springframework.data.redis.connection.RedisConnectionFactory;

    import org.springframework.data.redis.core.RedisTemplate;

    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

    import org.springframework.data.redis.serializer.StringRedisSerializer;

    /**

    * redis 配置类

    * @author fansongsong

    * @since 2020/09/10

    */

    @Configuration

    @EnableCaching

    public class RedisConfig {

        @Bean

        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

            RedisTemplate<String, Object> template = new RedisTemplate<>();

            template.setConnectionFactory(factory);

            // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化

            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

            ObjectMapper om = new ObjectMapper();

            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

            jackson2JsonRedisSerializer.setObjectMapper(om);

            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

            // key采用String的序列化方式

            template.setKeySerializer(stringRedisSerializer);

            // hash的key也采用String的序列化方式

            template.setHashKeySerializer(stringRedisSerializer);

            // value序列化方式采用jackson

            template.setValueSerializer(jackson2JsonRedisSerializer);

            // hash的value序列化方式采用jackson

            template.setHashValueSerializer(jackson2JsonRedisSerializer);

            template.afterPropertiesSet();

            return template;

        }

    }

    3、编写redis服务类

    简单列举get/set方法

     /**

    * redis 服务类

    * @author fansongsong

    * @since 2020/09/10

    */

    @Component

    @Slf4j

    public class RedisService {

        @Resource

        private RedisTemplate<Object, Object> redisTemplate;

        /**

        * 普通缓存获取

        *

        * @param key 键

        * @return 值

        */

        public Object get(String key) {

            return key == null ? null : redisTemplate.opsForValue().get(key);

        }

        /**

        * 普通缓存放入

        *

        * @param key 键

        * @param value 值

        * @return true成功 false失败

        */

        public boolean set(String key, Object value) {

            try {

                redisTemplate.opsForValue().set(key, value);

                return true;

            } catch (Exception e) {

                log.error("exception when set key {}. ", key, e);

                return false;

            }

        }

    }

    3、验证数据同步

    编写测试类方法

    执行方法

    执行成功查看其它从服数据是否同步(目前主服务:6379,从1服务: 6380,从2服务: 6381)

    数据同步成功!

    4、Redis客户端是如何进行主从切换的

    上述配置只指定了哨兵节点的地址与master的名称 = mymaster,但Redis客户端最终访问操作的是master节点,那么Redis客户端是如何获取master节点的地址,并在发生故障转移时,如何自动切换master地址的呢?以Jedis连接池为例,通过源码来了解内部实现原理。

    在 JedisSentinelPool 类的构造函数中,对连接池做了初始化,如下

      public JedisSentinelPool(String masterName, Set<String> sentinels,

          final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,

          final String password, final int database, final String clientName) {

        this.poolConfig = poolConfig;

        this.connectionTimeout = connectionTimeout;

        this.soTimeout = soTimeout;

        this.password = password;

        this.database = database;

        this.clientName = clientName;

        HostAndPort master = initSentinels(sentinels, masterName);

        initPool(master);

    }

    private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

        for (String sentinel : sentinels) {

          final HostAndPort hap = HostAndPort.parseString(sentinel);

          log.fine("Connecting to Sentinel " + hap);

          Jedis jedis = null;

          try {

            jedis = new Jedis(hap.getHost(), hap.getPort());

            List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

            // connected to sentinel...

            sentinelAvailable = true;

            if (masterAddr == null || masterAddr.size() != 2) {

              log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap

                  + ".");

              continue;

            }

            master = toHostAndPort(masterAddr);

            log.fine("Found Redis master at " + master);

            break;

          } catch (JedisException e) {

            // resolves #1036, it should handle JedisException there's another chance

            // of raising JedisDataException

            log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e

                + ". Trying next one.");

          } finally {

            if (jedis != null) {

              jedis.close();

            }

          }

        }

        //省略了非关键代码

        for (String sentinel : sentinels) {

          final HostAndPort hap = HostAndPort.parseString(sentinel);

          MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());

          // whether MasterListener threads are alive or not, process can be stopped

          masterListener.setDaemon(true);

          masterListeners.add(masterListener);

          masterListener.start();

        }

        return master;

      }

    initSentinels() 方法中主要干了两件事:

    遍历哨兵节点,通过get-master-addr-by-name命令获取master节点的地址信息,找到了就退出循环。get-master-addr-by-name命令执行结果如下所示

    [root@dev-server-1 master-slave]# redis-cli -p 26379

    127.0.0.1:26379> sentinel get-master-addr-by-name mymaster

    1) "192.168.40.201"

    2) "7001"

    127.0.0.1:26379>

        2.对每一个哨兵节点通过一个 MasterListener 进行监听(Redis的发布订阅功能),订阅哨兵节点+switch-master频道,当发生故障转移时,客户端能收到哨兵的通知,通过重新初始化连接池,完成主节点的切换。

    MasterListener.run方法中监听哨兵部分代码如下

      j.subscribe(new JedisPubSub() {

                @Override

                public void onMessage(String channel, String message) {

                  log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");

                  String[] switchMasterMsg = message.split(" ");

                  if (switchMasterMsg.length > 3) {

                    if (masterName.equals(switchMasterMsg[0])) {

                      initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));

                    } else {

                      log.fine("Ignoring message on +switch-master for master name "

                          + switchMasterMsg[0] + ", our master name is " + masterName);

                    }

                  } else {

                    log.severe("Invalid message received on Sentinel " + host + ":" + port

                        + " on channel +switch-master: " + message);

                  }

                }

              }, "+switch-master");

    initPool() 方法如下:如果发现新的master节点与当前的master不同,则重新初始化。

    private void initPool(HostAndPort master) {

        if (!master.equals(currentHostMaster)) {

          currentHostMaster = master;

          if (factory == null) {

            factory = new JedisFactory(master.getHost(), master.getPort(), connectionTimeout,

                soTimeout, password, database, clientName, false, null, null, null);

            initPool(poolConfig, factory);

          } else {

            factory.setHostAndPort(currentHostMaster);

            // although we clear the pool, we still have to check the

            // returned object

            // in getResource, this call only clears idle instances, not

            // borrowed instances

            internalPool.clear();

          }

          log.info("Created JedisPool to master at " + master);

        }

      }

    通过以上两步,Jedis客户端在只知道哨兵地址的情况下便能获得master节点的地址信息,并且当发生故障转移时能自动切换到新的master节点地址。

    相关文章

      网友评论

          本文标题:Springboot整合哨兵模式Redis集群验证

          本文链接:https://www.haomeiwen.com/subject/dclvektx.html