Redis实现事件监听
redis.conf配置文件中修改配置
notify-keyspace-events "Ex"
配置详解
字符 发送通知
K 键空间通知,所有通知以 keyspace@ 为前缀,针对Key
E 键事件通知,所有通知以 keyevent@ 为前缀,针对event
g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
lshzxe 的别名,相当于是All
单机版Redis监听
代码示例
配置类
@Configuration
public class RedisConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
@Bean
public KeyExpiredListener keyExpiredListener() {
return new KeyExpiredListener(this.redisMessageListenerContainer());
}
}
监听类
//继承MessageListener本地跑的通,在服务器上跑不通,原因不明
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Autowired
protected RedisTemplate redisTemplate;
@Value("${spring.redis.appStatusTTL}")
private Long appStatusTTL;
private static final Logger LOGGER = LoggerFactory.getLogger(KeyExpiredListener.class);
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
LOGGER.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
//业务处理
}
}
redis-cli客户端示例
$ redis-cli config set notify-keyspace-events KEA
$ redis-cli --csv psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__key*__:*",1
Redis模糊查询key
- :通配任意多个字符
?:通配单个字符
[]:通配括号内的某一个字符
主从复制版集群Redis监听
网上目前未发现能够直接监听cluster集群的方法,通用方法为为每一个节点配置单独的监听器;监听全部主节点即可
代码实现
配置类
@Configuration
public class RedisConfiguration {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Autowired
private RedisListenerErrorHandle redisListenerErrorHandle;
@Value("${spring.redis.host1}")
private String host1;
@Value("${spring.redis.host2}")
private String host2;
@Value("${spring.redis.host3}")
private String host3;
@Value("${spring.redis.port1}")
private int port1;
@Value("${spring.redis.port2}")
private int port2;
@Value("${spring.redis.port3}")
private int port3;
@Value("${spring.redis.password}")
private String password;
@Bean
JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(100);
jedisPoolConfig.setMaxWaitMillis(1000);
return jedisPoolConfig;
}
// redis-cluster不支持key过期监听,建立多个连接,对每个redis节点进行监听
@Bean
RedisMessageListenerContainer redisMessageListenerContainer1() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host1);
jedisConnectionFactory.setPort(port1);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.afterPropertiesSet();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
container.setErrorHandler(redisListenerErrorHandle);
return container;
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer2() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host2);
jedisConnectionFactory.setPort(port2);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.afterPropertiesSet();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
container.setErrorHandler(redisListenerErrorHandle);
return container;
}
@Bean
RedisMessageListenerContainer redisMessageListenerContainer3() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(host3);
jedisConnectionFactory.setPort(port3);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.afterPropertiesSet();
container.setConnectionFactory(jedisConnectionFactory);
container.addMessageListener(new KeyExpiredListener(container),new PatternTopic("__keyevent@0__:expired"));
container.setErrorHandler(redisListenerErrorHandle);
return container;
}
@Bean
KeyExpiredListener redisKeyExpirationListener1() {
return new KeyExpiredListener(redisMessageListenerContainer1());
}
@Bean
KeyExpiredListener redisKeyExpirationListener2() {
return new KeyExpiredListener(redisMessageListenerContainer2());
}
@Bean
KeyExpiredListener redisKeyExpirationListener3() {
return new KeyExpiredListener(redisMessageListenerContainer3());
}
}
监听器
//此代码运行时报错redisTemplate为null,但是代码完全正常执行业务。未找到原因
public class KeyExpiredListener extends KeyExpirationEventMessageListener {
@Autowired
protected RedisTemplate redisTemplate;
private static final Logger logger = LoggerFactory.getLogger(KeyExpiredListener.class);
public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
logger.warn("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
//TODO
}
}
错误处理类
@Component
public class RedisListenerErrorHandle implements ErrorHandler {
private static final Logger logger = LoggerFactory.getLogger(RedisListenerErrorHandle.class);
@Override
public void handleError(Throwable throwable) {
logger.error("正常监听");
}
}
自定义一个ErrorHandle并注入到RedisMessageListenerContainer中,redia源码中默认没有此处理类,不注入会导致每次监听时打印error级别日志:Execution of message listener failed, and no ErrorHandler has been set.
源码解析
RedisMessageListenerContainer类中有这样一段代码
/**
* Invoke the registered ErrorHandler, if any. Log at error level otherwise.
*
* @param ex the uncaught error that arose during message processing.
* @see #setErrorHandler
*/
protected void invokeErrorHandler(Throwable ex) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
} else if (logger.isWarnEnabled()) {
logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
}
}
/**
* Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default
* there will be <b>no</b> ErrorHandler so that error-level logging is the only result.
*/
//自己定义的RedisMessageListenerContainer需要手动设置一个ErrorHandle
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
Cluster集群监听
RedisAutoConfiguration类
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.Jedis;
/**
* @author wangrh
* @title:
* @description: 描述
* @date: 2020/12/30
*/
@Configuration
@ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class })
@AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class })
public class RedisAutoConfiguration {
@Configuration
@ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
public static class RedisStandAloneAutoConfiguration {
@Bean
public RedisMessageListenerContainer customizeRedisListenerContainer(
RedisConnectionFactory redisConnectionFactory, @Qualifier("keyExpiredEventMessageListener") MessageListener messageListener) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:expired"));
return redisMessageListenerContainer;
}
}
@Configuration
@ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
public static class RedisClusterAutoConfiguration {
@Bean
public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
beans.setBeanFactory(beanFactory);
beans.setRedisConnectionFactory(redisConnectionFactory);
return beans;
}
}
}
RedisMessageListenerFactory类
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.JedisShardInfo;
/**
* @author wangrh
* @title:
* @description: 描述
* @date: 2020/12/30
*/
@Slf4j
public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {
@Value("${spring.redis.password}")
private String password;
private DefaultListableBeanFactory beanFactory;
private RedisConnectionFactory redisConnectionFactory;
@Qualifier("keyExpiredEventMessageListener")
@Autowired
private MessageListener messageListener;
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (DefaultListableBeanFactory) beanFactory;
}
public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
this.redisConnectionFactory = redisConnectionFactory;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
if (redisClusterConnection != null) {
Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
log.info("获取到redis的master节点为[{}]",node.toString());
String containerBeanName = "messageContainer" + node.hashCode();
if (beanFactory.containsBean(containerBeanName)) {
return;
}
JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
jedisShardInfo.setPassword(password);
JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo);
BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
.genericBeanDefinition(RedisMessageListenerContainer.class);
containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
containerBeanDefinitionBuilder.setLazyInit(false);
beanFactory.registerBeanDefinition(containerBeanName,
containerBeanDefinitionBuilder.getRawBeanDefinition());
RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName,
RedisMessageListenerContainer.class);
String listenerBeanName = "messageListener" + node.hashCode();
if (beanFactory.containsBean(listenerBeanName)) {
return;
}
container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
container.start();
}
}
}
}
}
KeyExpiredEventMessageListener类
import com.tvpartner.wechat.users.vo.AppStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author wangrh
* @title:
* @description: redis集群监听业务类
* @date: 2020/12/30
*/
@Slf4j
@Component
public class KeyExpiredEventMessageListener implements MessageListener {
@Autowired
protected StringRedisTemplate stringRedisTemplate;
@Value("${spring.redis.appStatusTTL}")
private Long appStatusTTL;
@Override
public void onMessage(Message message, byte[] pattern) {
String expired = message.toString();
System.out.println("======接收监听===="+expired);
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(), StandardCharsets.UTF_8);
String[] split = key.split(":");
String isp = split[0];
String iptvUserId = split[1];
if ("CM".equals(isp) || "CTC".equals(isp) || "CU".equals(isp)) {
if(log.isInfoEnabled()) {
log.info("redis key 过期:pattern={},channel={},key={}", new String(pattern), channel, key);
}
stringRedisTemplate.opsForValue().set(key, AppStatusEnum.OFFLINE.code.toString(), appStatusTTL, TimeUnit.SECONDS);
String key2 = isp + ":" + iptvUserId + "*";
String key3 = "appstatus:" + isp + ":" + iptvUserId;
//获取key3状态,如果不存在或3(有在线用户状态)则查询本次操作后是否还有在线用户并修改状态,如果处于2(无在线用户状态)则不做操作
Object appstatus2 = stringRedisTemplate.opsForValue().get(key3);
if (null == appstatus2 || String.valueOf(AppStatusEnum.IPTV_ONLINE.code).equals(appstatus2.toString())) {
Set keys = stringRedisTemplate.keys(key2);
//遍历用户在线状态,如果有用户在线则状态不变,如果所有用户均不在线则修改状态为2
if (null != keys && keys.size() > 0) {
boolean flag = false;
for (Object k : keys) {
if (String.valueOf(AppStatusEnum.ONLINE.code).equals(stringRedisTemplate.opsForValue().get(k))) {
flag = true;
}
}
if (flag) {
stringRedisTemplate.opsForValue().set(key3, String.valueOf(AppStatusEnum.IPTV_ONLINE.code));
} else {
stringRedisTemplate.opsForValue().set(key3, String.valueOf(AppStatusEnum.IPTV_OFFLINE.code));
}
}
}
}
}
}
监听集群会出现类似广播的效果,导致消息被重复消费
getset命令加锁
一、redis的getset命令介绍:
1、getset命令自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,返回错误。
2、getset命令返回之前的旧值,如果之前Key不存在将返回null。
二、我们采用的是RedisTemplate操作redis,以下是封装的redisUtil中的getset方法内容:
public <T> T getAndSet(final String key, T value) {
T oldValue = null;
try {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
oldValue =(T) operations.getAndSet(key, value);
} catch (Exception e) {
e.printStackTrace();
}
return oldValue;
}
网友评论