Redis

作者: 匪石1941 | 来源:发表于2020-12-10 22:02 被阅读0次

Redis实现事件监听

redis.conf配置文件中修改配置

notify-keyspace-events "Ex"

配置详解

字符 发送通知
K 键空间通知,所有通知以 keyspace@ 为前缀,针对Key
E 键事件通知,所有通知以 keyevent@ 为前缀,针对event
g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
字符串命令的通知 l 列表命令的通知 s 集合命令的通知 h 哈希命令的通知 z 有序集合命令的通知 x 过期事件:每当有过期键被删除时发送 e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 A 参数 glshzxe 的别名,相当于是All

单机版Redis

代码示例

配置类

@Configuration
public class RedisConfiguration {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpiredListener keyExpiredListener() {
        return new KeyExpiredListener(this.redisMessageListenerContainer());
    }

}

监听类

//继承MessageListener本地跑的通,在服务器上跑不通,原因不明
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
    @Autowired
    protected RedisTemplate redisTemplate;
    @Value("${spring.redis.appStatusTTL}")
    private Long appStatusTTL;

    private static final Logger LOGGER = LoggerFactory.getLogger(KeyExpiredListener.class);

    public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //过期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        LOGGER.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
        //业务处理
    }
}

redis-cli客户端示例

$ redis-cli config set notify-keyspace-events KEA
$ redis-cli --csv psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__key*__:*",1

Redis模糊查询key

*:通配任意多个字符

?:通配单个字符

[]:通配括号内的某一个字符

主从复制版集群Redis

网上目前未发现能够直接监听cluster集群的方法,通用方法为为每一个节点配置单独的监听器;监听全部主节点即可

代码实现

配置类

@Configuration
public class RedisConfiguration {
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    @Autowired
    private RedisListenerErrorHandle redisListenerErrorHandle;

    @Value("${spring.redis.host1}")
    private String host1;

    @Value("${spring.redis.host2}")
    private String host2;

    @Value("${spring.redis.host3}")
    private String host3;

    @Value("${spring.redis.port1}")
    private int port1;

    @Value("${spring.redis.port2}")
    private int port2;

    @Value("${spring.redis.port3}")
    private int port3;

    @Value("${spring.redis.password}")
    private String password;


    @Bean
    JedisPoolConfig jedisPoolConfig(){
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMaxWaitMillis(1000);
        return jedisPoolConfig;
    }

    // redis-cluster不支持key过期监听,建立多个连接,对每个redis节点进行监听
    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer1() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName(host1);
        jedisConnectionFactory.setPort(port1);
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.afterPropertiesSet();
        container.setConnectionFactory(jedisConnectionFactory);

        container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
        container.setErrorHandler(redisListenerErrorHandle);
        return container;
    }

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer2() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName(host2);
        jedisConnectionFactory.setPort(port2);
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.afterPropertiesSet();
        container.setConnectionFactory(jedisConnectionFactory);

        container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
        container.setErrorHandler(redisListenerErrorHandle);

        return container;
    }

    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer3() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setHostName(host3);
        jedisConnectionFactory.setPort(port3);
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.afterPropertiesSet();
        container.setConnectionFactory(jedisConnectionFactory);

        container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
        container.setErrorHandler(redisListenerErrorHandle);

        return container;
    }

    @Bean
    KeyExpiredListener redisKeyExpirationListener1() {
        return new KeyExpiredListener(redisMessageListenerContainer1());
    }

    @Bean
    KeyExpiredListener redisKeyExpirationListener2() {
        return new KeyExpiredListener(redisMessageListenerContainer2());
    }

    @Bean
    KeyExpiredListener redisKeyExpirationListener3() {
        return new KeyExpiredListener(redisMessageListenerContainer3());
    }
}

监听器

//此代码运行时报错redisTemplate为null,但是代码完全正常执行业务。未找到原因
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
    @Autowired
    protected RedisTemplate redisTemplate;

    private static final Logger logger = LoggerFactory.getLogger(KeyExpiredListener.class);

    public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //过期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        logger.warn("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
       //TODO
    }
}

错误处理类

@Component
public class RedisListenerErrorHandle implements ErrorHandler {
    private static final Logger logger = LoggerFactory.getLogger(RedisListenerErrorHandle.class);

    @Override
    public void handleError(Throwable throwable) {
        logger.error("正常监听");
    }
}

自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.

源码解析

RedisMessageListenerContainer类中有这样一段代码

    /**
     * Invoke the registered ErrorHandler, if any. Log at error level otherwise.
     *
     * @param ex the uncaught error that arose during message processing.
     * @see #setErrorHandler
     */
    protected void invokeErrorHandler(Throwable ex) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(ex);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
        }
    }
    /**
     * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
     * there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
     */
        //自己定义的RedisMessageListenerContainer需要手动设置一个ErrorHandle
    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

相关文章

网友评论

      本文标题:Redis

      本文链接:https://www.haomeiwen.com/subject/bnxdgktx.html