Redis实现事件监听
redis.conf配置文件中修改配置
notify-keyspace-events "Ex"
配置详解
字符 发送通知
K 键空间通知,所有通知以 keyspace@ 为前缀,针对Key
E 键事件通知,所有通知以 keyevent@ 为前缀,针对event
g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
lshzxe 的别名,相当于是All
单机版Redis
代码示例
配置类
@Configuration
public class RedisConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpiredListener keyExpiredListener() {
return new KeyExpiredListener(this.redisMessageListenerContainer());
}
}
监听类
//继承MessageListener本地跑的通,在服务器上跑不通,原因不明
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Autowired
protected RedisTemplate redisTemplate;
@Value("${spring.redis.appStatusTTL}")
private Long appStatusTTL;
private static final Logger LOGGER = LoggerFactory.getLogger(KeyExpiredListener.class);
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
LOGGER.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
//业务处理
}
}
redis-cli客户端示例
$ redis-cli config set notify-keyspace-events KEA
$ redis-cli --csv psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__key*__:*",1
Redis模糊查询key
*:通配任意多个字符
?:通配单个字符
[]:通配括号内的某一个字符
主从复制版集群Redis
网上目前未发现能够直接监听cluster集群的方法,通用方法为为每一个节点配置单独的监听器;监听全部主节点即可
代码实现
配置类
@Configuration
public class RedisConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private RedisListenerErrorHandle redisListenerErrorHandle;
@Value("${spring.redis.host1}")
private String host1;
@Value("${spring.redis.host2}")
private String host2;
@Value("${spring.redis.host3}")
private String host3;
@Value("${spring.redis.port1}")
private int port1;
@Value("${spring.redis.port2}")
private int port2;
@Value("${spring.redis.port3}")
private int port3;
@Value("${spring.redis.password}")
private String password;
@Bean
JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(100);
jedisPoolConfig.setMaxWaitMillis(1000);
return jedisPoolConfig;
}
// redis-cluster不支持key过期监听,建立多个连接,对每个redis节点进行监听
@Bean
RedisMessageListenerContainer redisMessageListenerContainer1() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host1);
jedisConnectionFactory.setPort(port1);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.afterPropertiesSet();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
container.setErrorHandler(redisListenerErrorHandle);
return container;
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer2() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host2);
jedisConnectionFactory.setPort(port2);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.afterPropertiesSet();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
container.setErrorHandler(redisListenerErrorHandle);
return container;
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer3() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host3);
jedisConnectionFactory.setPort(port3);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.afterPropertiesSet();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
container.setErrorHandler(redisListenerErrorHandle);
return container;
}
@Bean
KeyExpiredListener redisKeyExpirationListener1() {
return new KeyExpiredListener(redisMessageListenerContainer1());
}
@Bean
KeyExpiredListener redisKeyExpirationListener2() {
return new KeyExpiredListener(redisMessageListenerContainer2());
}
@Bean
KeyExpiredListener redisKeyExpirationListener3() {
return new KeyExpiredListener(redisMessageListenerContainer3());
}
}
监听器
//此代码运行时报错redisTemplate为null,但是代码完全正常执行业务。未找到原因
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Autowired
protected RedisTemplate redisTemplate;
private static final Logger logger = LoggerFactory.getLogger(KeyExpiredListener.class);
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
logger.warn("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
//TODO
}
}
错误处理类
@Component
public class RedisListenerErrorHandle implements ErrorHandler {
private static final Logger logger = LoggerFactory.getLogger(RedisListenerErrorHandle.class);
@Override
public void handleError(Throwable throwable) {
logger.error("正常监听");
}
}
自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.
源码解析
RedisMessageListenerContainer类中有这样一段代码
/**
* Invoke the registered ErrorHandler, if any. Log at error level otherwise.
*
* @param ex the uncaught error that arose during message processing.
* @see #setErrorHandler
*/
protected void invokeErrorHandler(Throwable ex) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
} else if (logger.isWarnEnabled()) {
logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
}
}
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
*/
//自己定义的RedisMessageListenerContainer需要手动设置一个ErrorHandle
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
网友评论