美文网首页
Redis Cluster模式集群监听

Redis Cluster模式集群监听

作者: 匪石1941 | 来源:发表于2021-01-06 14:46 被阅读0次

    Redis实现事件监听

    redis.conf配置文件中修改配置

    notify-keyspace-events "Ex"
    

    配置详解

    字符 发送通知
    K 键空间通知,所有通知以 keyspace@ 为前缀,针对Key
    E 键事件通知,所有通知以 keyevent@ 为前缀,针对event
    g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
    字符串命令的通知 l 列表命令的通知 s 集合命令的通知 h 哈希命令的通知 z 有序集合命令的通知 x 过期事件:每当有过期键被删除时发送 e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 A 参数 glshzxe 的别名,相当于是All

    单机版Redis监听

    代码示例

    配置类

    @Configuration
    public class RedisConfiguration {
    
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
    
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer() {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            return redisMessageListenerContainer;
        }
    
        @Bean
        public KeyExpiredListener keyExpiredListener() {
            return new KeyExpiredListener(this.redisMessageListenerContainer());
        }
    
    }
    
    

    监听类

    //继承MessageListener本地跑的通,在服务器上跑不通,原因不明
    public class KeyExpiredListener extends KeyExpirationEventMessageListener {
        @Autowired
        protected RedisTemplate redisTemplate;
        @Value("${spring.redis.appStatusTTL}")
        private Long appStatusTTL;
    
        private static final Logger LOGGER = LoggerFactory.getLogger(KeyExpiredListener.class);
    
        public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
            //过期的key
            String key = new String(message.getBody(), StandardCharsets.UTF_8);
            LOGGER.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
            //业务处理
        }
    }
    
    

    redis-cli客户端示例

    $ redis-cli config set notify-keyspace-events KEA
    $ redis-cli --csv psubscribe '__key*__:*'
    Reading messages... (press Ctrl-C to quit)
    "psubscribe","__key*__:*",1
    
    

    Redis模糊查询key

    • :通配任意多个字符

    ?:通配单个字符

    []:通配括号内的某一个字符

    主从复制版集群Redis监听

    网上目前未发现能够直接监听cluster集群的方法,通用方法为为每一个节点配置单独的监听器;监听全部主节点即可

    代码实现

    配置类

    @Configuration
    public class RedisConfiguration {
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
        @Autowired
        private RedisListenerErrorHandle redisListenerErrorHandle;
    
        @Value("${spring.redis.host1}")
        private String host1;
    
        @Value("${spring.redis.host2}")
        private String host2;
    
        @Value("${spring.redis.host3}")
        private String host3;
    
        @Value("${spring.redis.port1}")
        private int port1;
    
        @Value("${spring.redis.port2}")
        private int port2;
    
        @Value("${spring.redis.port3}")
        private int port3;
    
        @Value("${spring.redis.password}")
        private String password;
    
        @Bean
        JedisPoolConfig jedisPoolConfig(){
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxIdle(100);
            jedisPoolConfig.setMaxWaitMillis(1000);
            return jedisPoolConfig;
        }
    
        // redis-cluster不支持key过期监听,建立多个连接,对每个redis节点进行监听
        @Bean
        RedisMessageListenerContainer redisMessageListenerContainer1() {
            final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
            jedisConnectionFactory.setHostName(host1);
            jedisConnectionFactory.setPort(port1);
            jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
            jedisConnectionFactory.setPassword(password);
            jedisConnectionFactory.afterPropertiesSet();
            container.setConnectionFactory(jedisConnectionFactory);
    
            container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
            container.setErrorHandler(redisListenerErrorHandle);
            return container;
        }
    
        @Bean
        RedisMessageListenerContainer redisMessageListenerContainer2() {
            final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
            jedisConnectionFactory.setHostName(host2);
            jedisConnectionFactory.setPort(port2);
            jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
            jedisConnectionFactory.setPassword(password);
            jedisConnectionFactory.afterPropertiesSet();
            container.setConnectionFactory(jedisConnectionFactory);
    
            container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
            container.setErrorHandler(redisListenerErrorHandle);
    
            return container;
        }
    
        @Bean
        RedisMessageListenerContainer redisMessageListenerContainer3() {
            final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
            jedisConnectionFactory.setHostName(host3);
            jedisConnectionFactory.setPort(port3);
            jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
            jedisConnectionFactory.setPassword(password);
            jedisConnectionFactory.afterPropertiesSet();
            container.setConnectionFactory(jedisConnectionFactory);
    
            container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
            container.setErrorHandler(redisListenerErrorHandle);
    
            return container;
        }
    
        @Bean
        KeyExpiredListener redisKeyExpirationListener1() {
            return new KeyExpiredListener(redisMessageListenerContainer1());
        }
    
        @Bean
        KeyExpiredListener redisKeyExpirationListener2() {
            return new KeyExpiredListener(redisMessageListenerContainer2());
        }
    
        @Bean
        KeyExpiredListener redisKeyExpirationListener3() {
            return new KeyExpiredListener(redisMessageListenerContainer3());
        }
    }
    
    

    监听器

    //此代码运行时报错redisTemplate为null,但是代码完全正常执行业务。未找到原因
    public class KeyExpiredListener extends KeyExpirationEventMessageListener {
        @Autowired
        protected RedisTemplate redisTemplate;
    
        private static final Logger logger = LoggerFactory.getLogger(KeyExpiredListener.class);
    
        public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
            //过期的key
            String key = new String(message.getBody(), StandardCharsets.UTF_8);
            logger.warn("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
           //TODO
        }
    }
    
    

    错误处理类

    @Component
    public class RedisListenerErrorHandle implements ErrorHandler {
        private static final Logger logger = LoggerFactory.getLogger(RedisListenerErrorHandle.class);
    
        @Override
        public void handleError(Throwable throwable) {
            logger.error("正常监听");
        }
    }
    
    

    自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.

    源码解析

    RedisMessageListenerContainer类中有这样一段代码

        /**
         * Invoke the registered ErrorHandler, if any. Log at error level otherwise.
         *
         * @param ex the uncaught error that arose during message processing.
         * @see #setErrorHandler
         */
        protected void invokeErrorHandler(Throwable ex) {
            if (this.errorHandler != null) {
                this.errorHandler.handleError(ex);
            } else if (logger.isWarnEnabled()) {
                logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
            }
        }
        /**
         * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
         * there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
         */
            //自己定义的RedisMessageListenerContainer需要手动设置一个ErrorHandle
        public void setErrorHandler(ErrorHandler errorHandler) {
            this.errorHandler = errorHandler;
        }
    
    

    Cluster集群监听

    RedisAutoConfiguration类

    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
    import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.jedis.JedisConnection;
    import org.springframework.data.redis.core.RedisOperations;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import redis.clients.jedis.Jedis;
    /**
     * @author wangrh
     * @title:
     * @description: 描述
     * @date: 2020/12/30
     */
    @Configuration
    @ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class })
    @AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class })
    public class RedisAutoConfiguration {
        @Configuration
        @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
        public static class RedisStandAloneAutoConfiguration {
            @Bean
            public RedisMessageListenerContainer customizeRedisListenerContainer(
                    RedisConnectionFactory redisConnectionFactory, @Qualifier("keyExpiredEventMessageListener") MessageListener messageListener) {
                RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
                redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
                redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:expired"));
                return redisMessageListenerContainer;
            }
        }
    
        @Configuration
        @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
        public static class RedisClusterAutoConfiguration {
            @Bean
            public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                                                                           RedisConnectionFactory redisConnectionFactory) {
                RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
                beans.setBeanFactory(beanFactory);
                beans.setRedisConnectionFactory(redisConnectionFactory);
                return beans;
            }
        }
    }
    

    RedisMessageListenerFactory类

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.BeanFactoryAware;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.BeanDefinition;
    import org.springframework.beans.factory.support.BeanDefinitionBuilder;
    import org.springframework.beans.factory.support.DefaultListableBeanFactory;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.RedisClusterConnection;
    import org.springframework.data.redis.connection.RedisClusterNode;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import redis.clients.jedis.JedisShardInfo;
    /**
     * @author wangrh
     * @title:
     * @description: 描述
     * @date: 2020/12/30
     */
    @Slf4j
    public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {
        @Value("${spring.redis.password}")
        private String password;
    
        private DefaultListableBeanFactory beanFactory;
    
        private RedisConnectionFactory redisConnectionFactory;
    
        @Qualifier("keyExpiredEventMessageListener")
        @Autowired
        private MessageListener messageListener;
    
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.beanFactory = (DefaultListableBeanFactory) beanFactory;
        }
    
        public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
            this.redisConnectionFactory = redisConnectionFactory;
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
            if (redisClusterConnection != null) {
                Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
                for (RedisClusterNode node : nodes) {
                    if (node.isMaster()) {
                        log.info("获取到redis的master节点为[{}]",node.toString());
                        String containerBeanName = "messageContainer" + node.hashCode();
                        if (beanFactory.containsBean(containerBeanName)) {
                            return;
                        }
                        JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
                        jedisShardInfo.setPassword(password);
                        JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo);
                        BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                                .genericBeanDefinition(RedisMessageListenerContainer.class);
                        containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                        containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                        containerBeanDefinitionBuilder.setLazyInit(false);
                        beanFactory.registerBeanDefinition(containerBeanName,
                                containerBeanDefinitionBuilder.getRawBeanDefinition());
    
                        RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName,
                                RedisMessageListenerContainer.class);
                        String listenerBeanName = "messageListener" + node.hashCode();
                        if (beanFactory.containsBean(listenerBeanName)) {
                            return;
                        }
                        container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
                        container.start();
                    }
                }
            }
        }
    }
    

    KeyExpiredEventMessageListener类

    import com.tvpartner.wechat.users.vo.AppStatusEnum;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    import java.nio.charset.StandardCharsets;
    import java.util.Set;
    import java.util.concurrent.TimeUnit;
    /**
     * @author wangrh
     * @title:
     * @description: redis集群监听业务类
     * @date: 2020/12/30
     */
    @Slf4j
    @Component
    public class KeyExpiredEventMessageListener implements MessageListener  {
        @Autowired
        protected StringRedisTemplate stringRedisTemplate;
        @Value("${spring.redis.appStatusTTL}")
        private Long appStatusTTL;
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expired = message.toString();
            System.out.println("======接收监听===="+expired);
            String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
            //过期的key
            String key = new String(message.getBody(), StandardCharsets.UTF_8);
            String[] split = key.split(":");
            String isp = split[0];
            String iptvUserId = split[1];
            if ("CM".equals(isp) || "CTC".equals(isp) || "CU".equals(isp)) {
                if(log.isInfoEnabled()) {
                    log.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
                }
                stringRedisTemplate.opsForValue().set(key, AppStatusEnum.OFFLINE.code.toString(), appStatusTTL, TimeUnit.SECONDS);
                String key2 = isp + ":" + iptvUserId + "*";
                String key3 = "appstatus:" + isp + ":" + iptvUserId;
                //获取key3状态,如果不存在或3(有在线用户状态)则查询本次操作后是否还有在线用户并修改状态,如果处于2(无在线用户状态)则不做操作
                Object appstatus2 = stringRedisTemplate.opsForValue().get(key3);
                if (null == appstatus2 || String.valueOf(AppStatusEnum.IPTV_ONLINE.code).equals(appstatus2.toString())) {
                    Set keys = stringRedisTemplate.keys(key2);
                    //遍历用户在线状态,如果有用户在线则状态不变,如果所有用户均不在线则修改状态为2
                    if (null != keys && keys.size() > 0) {
                        boolean flag = false;
                        for (Object k : keys) {
                            if (String.valueOf(AppStatusEnum.ONLINE.code).equals(stringRedisTemplate.opsForValue().get(k))) {
                                flag = true;
                            }
                        }
                        if (flag) {
                            stringRedisTemplate.opsForValue().set(key3, String.valueOf(AppStatusEnum.IPTV_ONLINE.code));
                        } else {
                            stringRedisTemplate.opsForValue().set(key3, String.valueOf(AppStatusEnum.IPTV_OFFLINE.code));
                        }
                    }
                }
            }
        }
    }
    

    监听集群会出现类似广播的效果,导致消息被重复消费

    getset命令加锁

    一、redis的getset命令介绍:

    1、getset命令自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,返回错误。

    2、getset命令返回之前的旧值,如果之前Key不存在将返回null。

    二、我们采用的是RedisTemplate操作redis,以下是封装的redisUtil中的getset方法内容:

    public  <T> T getAndSet(final String key, T value) {
            T oldValue = null;
            try {
                ValueOperations<String, Object> operations = redisTemplate.opsForValue();
                oldValue =(T) operations.getAndSet(key, value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return oldValue;
        }
    

    相关文章

      网友评论

          本文标题:Redis Cluster模式集群监听

          本文链接:https://www.haomeiwen.com/subject/tqwnoktx.html