Redis 不仅提供一个NoSQL数据库,同时提供了一套消息系统,在开发过程中,应用场景非常多,根据不同的业务需求,可以实现相应的功能。
我的业务需求是,在分布式系统中,当其中一个节点产生一条消息时,需要同时通知其他节点做相应的处理。
相关类的解释:
RedisMessageListenerContainer
Redis订阅发布的监听容器,通过Redis的消息发布、订阅配置都在这里面实现
- addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息
- setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析
MessageListenerAdapter
监听适配器
- MessageListenerAdapter(Object , defaultListenerMethod) 创建监听适配器,绑定订阅接收器和接收消息的方法
RedisTemplate
Redis模版类
- convertAndSend(String channel, Object message) 发布者向Redis发布消息
一、新增依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
二、Redis连接信息配置
spring:
redis:
host: 127.0.0.1
port: 6381
### Redis数据库索引(默认为0)
database: 0
### 连接超时时间(毫秒)
timeout: 60000ms
password:
lettuce:
pool:
### 最大连接数(使用负值表示没有限制) 默认8
max-active: 8
### 最小空闲连接 默认8
min-idle: 0
### 连接池中的最大空闲连接 默认8
max-idle: 8
### 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
三、RedisConfig核心类,实现了Redis连接,订阅以及发布配置
@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// valuevalue采用jackson序列化方式
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value采用jackson序列化方式
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//开启事务支持
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* Redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,
* 该消息监听器通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
/**
* 加入消息监听器(可以加入多个主题监听器,监听器也可以监听多个主题)
**/
// 加入WebSocket监听器
final String TOPIC_NAME = "TEST_TOPIC"; // 订阅主题
MessageListenerAdapter webSocketListenerAdapter = webSocketListenerAdapter();
redisMessageListenerContainer.addMessageListener(webSocketListenerAdapter, new PatternTopic(TOPIC_NAME));
/**
* 设置序列化对象
* 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化
* 2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息
*/
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
redisMessageListenerContainer.setTopicSerializer(seria);
return redisMessageListenerContainer;
}
/**
* 绑定WebSocket消息推送接收器和接收方法
*/
@Bean
public MessageListenerAdapter webSocketListenerAdapter() {
WebSocketReceiver webSocketReceiver = new WebSocketReceiver(); // 消息接收器
final String RECEIVE_MESSAGE_METHOD = "receiveMessage"; // 消息接收器的方法名称
return new MessageListenerAdapter(webSocketReceiver, RECEIVE_MESSAGE_METHOD);
}
}
四、封装消息对象
@Data
public class MessageDTO implements Serializable {
private String type;
private String title;
private String content;
}
五、消息接收器
public class WebSocketReceiver {
/**
* 接收WebSocket推送的消息并处理
* @param message
*/
public void receiveMessage(String message) {
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(WebSocketMessageDTO.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
MessageDTO message = (MessageDTO ) seria.deserialize(message.getBytes());
// 接收到消息对象,自己实现相关的业务处理...
}
}
六、测试发布消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRedis {
@Resource
private RedisTemplate redisTemplate;
@Test
public void test() {
final String TOPIC_NAME = "TEST_TOPIC"; // 订阅主题
MessageDTO message = new MessageDTO();
message.setTitle("订阅发布测试...");
// 发布消息
redisTemplate.convertAndSend(TOPIC_NAME, message);
}
}
网友评论