redis过期监听

作者: szgl_lucifer | 来源:发表于2018-12-28 09:42 被阅读32次

    我的应用场景:因为业务需求,我们会每10分钟从kafka得到数据开始处理,这时就会存在一种情况,如果kafka数据没有传递过来,我们是不是要通过一种方式知道数据没传递通知我们。在这里我选用的模式是redis提供的观察者模式。

    原理:从kafka得到的数据保存到redis中,key的失效时间可以自定义配置(我定义15分钟失效),每次从kafka得到数据都去刷新redis,这样如果kafka每次都传递数据,redis就不会失效,如果不传递数据,redis就会失效,然后通过redis的监听器得到这个失效的redis再进行后续处理(我们这边是进行邮件报警)

    1:配置redis的失效监听,需要修改redis.conf配置文件

    增加:notify-keyspace-events "Ex"

    配置文件中找到notify-keyspace-events,修改成notify-keyspace-events "Ex"

    Ex 的解释如下:

    2:配置文件修改好后,重新启动redis,我的redis是用docker启动的。重新启动了容器。

    3:验证redis失效监听是否好用。

    进入redis容器:

    docker exec -it redis /bin/sh

    运行redis客户端:

    redis-cli

    运行监听命令:

    psubscribe __keyevent@0__:expired

    再启动一个redis-cli

    创建一个10秒后失效的reids:

    setex test 10 test

    10秒后,可以看到监听端口可以接收到失效的redis的key.

    4:java代码编写,pom.xml引入

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-data-redis</artifactId>

    </dependency>

    5:配置redis的config,配置了两种方式,第一种方式是支持@Cacheable创建redis.第二种方式是直接引用redis提供的StringRedisTemplate类来调用redis的set和get方法。

    第一种方式的配置文件写法:RedisCacheConfig.java

    @Configuration

    public class RedisCacheConfig{

    @Bean

        public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) {

            return new RedisCacheManager(

                    RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory),

                    this.getRedisCacheConfigurationBase(), // 默认策略

                    this.getRedisCacheConfigurationMap() // 指定 key 策略

            );

    }

    private RedisSerializer<String> keySerializer() {

            return new StringRedisSerializer();

    }

    private RedisSerializer<Object> valueSerializer() {

            return new GenericJackson2JsonRedisSerializer();

    }

    private Map<String, RedisCacheConfiguration> getRedisCacheConfigurationMap() {

            Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>();

            redisCacheConfigurationMap.put("NoDataCache0", this.getRedisCacheConfigurationWithTtl(60));

            return redisCacheConfigurationMap;

        }

    private RedisCacheConfiguration getRedisCacheConfigurationBase() {

    RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()

                    //.entryTtl(this.timeToLive)  --定义到期时间

                    .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(keySerializer()))

                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(valueSerializer()));

                    //.disableCachingNullValues(); //保存的数据不能为null

            return config;

        }

    private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer seconds) {

            RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()

                    .entryTtl(Duration.ofSeconds(seconds))  //定义到期时间

                    .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(keySerializer()))

                    .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(valueSerializer()));

                    //.disableCachingNullValues(); //保存的数据不能为null

            return config;

        }

    }

    用RedisCacheManager的构造方法来实现,我定义了一个60秒失效的策略,redis的写法如下:

    @Cacheable(value = "NoDataCache0" key="'rc_alarmrule_2_'+#tenantId+'_'+#metricSpecId")

    public List<AlarmRuleRedisAllVO> getAlarmRuleAllRedis(long tenantId,long metricSpecId) {

    return null;

    }

    1)value = "NoDataCache0" 是我策略里定义的名称redisCacheConfigurationMap.put("NoDataCache0", this.getRedisCacheConfigurationWithTtl(60));

    这个可以定义多个。每个的名称和失效时间配置不一样。

    2)这种方式的配置,失效时间只能写在配置文件中或者写死,如果我想按照我表中定义一个失效时间实时的进行变化就做不到了。所以我又用了第二种配置方案。

    第二种方式的配置文件写法:RedisCacheConfig.java

    @Service

    public class RedisService {

        @Autowired

        private StringRedisTemplate redisTemplate;

        /**

        * 永久

        * @param key

        * @param value

        */

        public void set(String key, Object value) {

            redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value));

        }

        /**

        * 将 key,value 存放到redis数据库中,设置过期时间单位是秒

        * @param key

        * @param value

        * @param timeout

        */

        public void setBySeconds(String key, Object value, long timeout) {

            redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.SECONDS);

        }

        /**

        * 将 key,value 存放到redis数据库中,设置过期时间单位是分钟

        * @param key

        * @param value

        */

        public void setByMinutes(String key, Object value, long timeout) {

            redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.MINUTES);

        }

        /**

        * 将 key,value 存放到redis数据库中,设置过期时间单位是小时

        * @param key

        * @param value

        */

        public void setByHours(String key, Object value, long timeout) {

            redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.HOURS);

        }

        /**

        * 将 key,value 存放到redis数据库中,设置过期时间单位是天

        * @param key

        * @param value

        */

        public void setByDays(String key, Object value, long timeout) {

            redisTemplate.opsForValue().set(key, JsonUtil.convertObj2String(value), timeout, TimeUnit.DAYS);

        }

        /**

        * 删除 key 对应的 value

        * @param key

        */

        public void delete(String key) {

            redisTemplate.delete(key);

        }

        /**

        * 获取与 key 对应的对象

        * @param key

        * @param clazz 目标对象类型

        * @param <T>

        * @return

        */

        public <T> T get(String key, Class<T> clazz) {

            String s = get(key);

            if (s == null) {

                return null;

            }

            return JsonUtil.convertString2Obj(s, clazz);

        }

        /**

        * 获取与 key 对应的对象

        * @param key

        * @param clazz 目标对象类型

        * @param <T>

        * @return

        * @throws IOException

        * @throws JsonMappingException

        * @throws JsonParseException

        */

        public <T> List<T> getList(String key, Class<T> clazz) throws JsonParseException, JsonMappingException, IOException {

            String s = get(key);

            if (s == null) {

                return null;

            }

            return JsonUtil.toList(s,clazz);

        }

        /**

        * 获取 key 对应的字符串

        * @param key

        * @return

        */

        public String get(String key) {

            return redisTemplate.opsForValue().get(key);

        }

        /**

        * 查询 key 对应的过期时间

        * @param key

        * @return

        */

        public String getExpire(String key){

            Long timeout = redisTemplate.getExpire(key,TimeUnit.MILLISECONDS);

            System.out.println(timeout);

            if (timeout < 0)

                return "Has expired!";

            Long milliSecond = LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli() + timeout;

            return DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.ofInstant(

                    Instant.ofEpochMilli(milliSecond),ZoneId.systemDefault()));

        }

        /**

        * 判断 key 是否在 redis 数据库中

        * @param key

        * @return

        */

        public boolean exists(final String key) {

            return redisTemplate.hasKey(key);

        }

    这种方式可以实时的改变redis的失效时间。

    6:配置redis失效的监听config  RedisListenerConfig.java

    @Configuration

    public class RedisListenerConfig {

        @Bean

        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

            RedisMessageListenerContainer container = new RedisMessageListenerContainer();

            container.setConnectionFactory(connectionFactory);

    //        container.addMessageListener(new RedisExpiredListener(), new PatternTopic("__keyevent@0__:expired"));

            return container;

        }

    }

    我没有配置__keyevent@0__:expired",对某个db进行监听,RedisMessageListenerContainer有个默认的配置是对所有的db进行监听。

    7:redis的监听类 RedisKeyExpirationListener.java

    @Component

    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {

            super(listenerContainer);

        }

        /**

        * 针对redis数据失效事件,进行数据处理

        * @param message

        * @param pattern

        */

    @Override

        public void onMessage(Message message, byte[] pattern) {

            //message.toString()可以获取失效的key

            String expiredKey = message.toString();

            logger.debug("失效的redis是:"+expiredKey);

            String part = "NoDataCache0";

            //如果是NoDataCache0:开头的key,进行处理

            if(expiredKey.startsWith(part)){

                    //自己的业务逻辑

            }

    }

    1)注意:失效的redis只能得到key,是得不到value的,所以业务逻辑如果用到value里的值需要把值写到key中。

    相关文章

      网友评论

        本文标题:redis过期监听

        本文链接:https://www.haomeiwen.com/subject/ccqdlqtx.html