注意事项
redis 发布订阅支持redis cluster集群使用,key过期通知不支持集群使用,目前也没查到集群key过期通知相关资料,如哪位实现了redis集群key过期通知还请评论告知。
在使用redis key 过期通知一定要手动连接redis-cli客户端 开启key过期通知功能。
本事例基于spring-data-redis实现,以下是pom文件中 Spring-data-redis及jedis 相关依赖
<!-- redis dependency -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
<version>1.13.6.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-redis -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.14.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
redis key 过期通知命令行使用
-
登录进入redis-cli 客户端(需开启两个客户端窗口,便于查看key过期通知)
使用redis key 过期通知,需手动开启key过期通知功能:
config set notify-keyspace-events Ex
- 启动监听key过期
> PSUBSCRIBE __keyevent@*__:expired
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
- 在开启一个客户端,添加test 为key,并设置5秒过期
> setex test 5 test
OK
- 回到监听窗口,查看监听窗口key过期信息
> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "test"
基于Spring-data-redis实现key过期通知及消息发布订阅
- 创建用于处理接收消息的接口
package com.erxiao.service;
import java.io.Serializable;
import java.util.Map;
/**
* Created by erxiao on 2018/10/8.
* 发布订阅消息处理类
*/
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
void handleMessage(Serializable message, String channel);
}
- 实现处理消息的接口
package com.erxiao.service.impl;
import com.erxiao.service.MessageDelegate;
import java.io.Serializable;
import java.util.Map;
/**
* Created by erxiao on 2018/10/8.
* 发布订阅消息处理类
*/
public class MessageDelegateImpl implements MessageDelegate {
@Override
public void handleMessage(String message) {
System.out.println("handleMessage 1 : + " + message);
}
@Override
public void handleMessage(Map message) {
System.out.println("handleMessage 2 : + " + message.toString());
}
@Override
public void handleMessage(byte[] message) {
System.out.println("handleMessage 3 : + " + message.toString());
}
@Override
public void handleMessage(Serializable message) {
System.out.println("handleMessage 4 : + " + message.toString());
}
@Override
public void handleMessage(Serializable message, String channel) {
System.out.println("handleMessage 5 : + " + message.toString() + " channel : " + channel);
}
}
- spring-data-redis 集成redis配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<!--设置连接池-->
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="300"/>
<property name="maxTotal" value="600"/>
<property name="maxWaitMillis" value="1000"/>
<property name="testOnBorrow" value="true"/>
<property name="testOnReturn" value="true"/>
</bean>
<!--设置链接属性-->
<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="172.16.160.**"/>
<property name="port" value="6379"/>
<property name="password" value="***redis连接密码***"/>
<property name="timeout" value="100000"/>
<property name="poolConfig" ref="poolConfig"/>
</bean>
<!--设置序列化方式-->
<bean id="defaultRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"></bean>
<!-- redisTemplate -->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultSerializer" ref="defaultRedisSerializer"/>
</bean>
<!--创建发布订阅消息及key过期监听类-->
<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.erxiao.service.impl.MessageDelegateImpl"/>
</constructor-arg>
</bean>
<!--引用配置监听类,配置监听特定消息-->
<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListeners">
<map>
<entry key-ref="messageListener">
<list>
<!-- 频道订阅 -->
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="ChatRoom" />
</bean>
<!-- 模式订阅 -->
<bean class="org.springframework.data.redis.listener.PatternTopic">
<constructor-arg value="__keyevent@*__:expired" />
</bean>
</list>
</entry>
</map>
</property>
</bean>
</beans>
<!-- end -->
- 测试类
import com.erxiao.service.RedisMassage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.util.Date;
/**
* Created by erxiao on 2018/11/9.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class RedisMessageTest {
@Resource
private RedisMassage redisMassage;
/**
* 发送消息
*/
@Test
public void sendMessageTest(){
String msg = "Hello, Redis!";
redisMassage.sendMessage("ChatRoom", msg);
}
/**
* 用于接收消息
*/
@Test
public void subMessageTest(){
while (true) { //这里是一个死循环,目的就是让进程不退出,用于接收发布的消息
try {
System.out.println("current time: " + new Date());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace(); }
}
}
}
- key过期测试
启动 subMessageTest 测试类

通过命令行连接redis,设置5秒自动过期 key:test
172.16.160.**:6379> setex test 5 value
OK
172.16.160.**:6379> get test
"value"
172.16.160.**:6379> get test
(nil)
172.16.160.**:6379>
key过期后控制台打印过期的key

- 消息发布订阅测试
启动 subMessageTest 测试类 监听 ChatRoom 类型的消息

启动 sendMessageTest 测试类 发送 ChatRoom 类型的消息

消息发送成功,控制台打印发送的消息

网友评论