1、说明
Redis
自身提供了发布/订阅(publish/subscribe
)模式。实现方式大致流程如下图:
发布订阅三个角色:发布者,订阅者和Channel。
2、redis发布订阅命令
- 发布者使用命令
publish
+ channel + msg
127.0.0.1:6379[1]> publish channel01 "important things"
(integer) 0
127.0.0.1:6379[1]> publish channel01 "important things"
(integer) 1
127.0.0.1:6379[1]> publish channel01 "hello girl"
(integer) 2
注意:发布返回的是订阅者数量,发布的消息不会持久化,没有订阅者时候,发布消息会丢失,当在发布消息之后对channel进行订阅不会收到之前发布的消息。
- 订阅者使用命令
subscribe
+ channel
127.0.0.1:6379[1]> subscribe channel01
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel01"
3) (integer) 1
1) "message"
2) "channel01"
3) "important things"
1) "message"
2) "channel01"
3) "hello somebody"
使用subscribe
客户端进入订阅状态,该状态下不能使用与“发布/订阅”无关的其他命令。
3、RedisTemplate 实现发布订阅
- 发布者,使用
convertAndSend(channel , message)
方法实现消息发布
@RequestMapping("/pubSub")
@RestController
public class RedisComprehensive {
private static final String CHANNEL = "channel_01";
@Autowired
private RedisTemplate redisTemplate;
@GetMapping(value = "/publishMessage")
public void publishMessage(String message){
// 发布者
redisTemplate.convertAndSend(CHANNEL,message);
}
}
- 订阅者分析
(1)配置redis定义消息容器RedisMessageListenerContainer。
addMessageListener
(MessageListenerAdapter,PatternTopic): 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
setTopicSerializer
(RedisSerializer) :对频道内容进行序列化解析
(2)配置适配器MessageListenerAdapter
MessageListenerAdapter
(Object delegate, String defaultListenerMethod)
delegate
消息订阅者类,defaultListenerMethod
消息处理方法
【实现代码如下】:
@Configuration
public class RedisConfig {
private static final String CHANNEL = "channel_01";
/**
* @name: container
* @description: Redis订阅消息监听容器
* @param connectionFactory
* @param adapter
* @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter adapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个MessageListener
container.addMessageListener(adapter,new PatternTopic(CHANNEL));
return container;
}
/**
* @name: listenerAdapter
* @description: 配置消息处理适配器
* @param msgReceiveListener
* @return: org.springframework.data.redis.listener.adapter.MessageListenerAdapter
*
*/
@Bean
public MessageListenerAdapter listenerAdapter(MsgReceiveListener msgReceiveListener){
// messageListenerAdapter 传入一个消息接受的处理器,利用反射的方式调用对应的处理方法
return new MessageListenerAdapter(msgReceiveListener,"onMessage");
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MsgReceiveListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
String body = serializer.deserialize(message.getBody());
String chanel = serializer.deserialize(message.getChannel());
log.info("接收的消息:{},使用的chanel:{}",body,chanel);
}
}
执行结果:
2021-06-24 15:06:18.436 INFO 28520 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 13 ms
2021-06-24 15:06:18.540 INFO 28520 --- [ container-2] c.l.r.listener.MsgReceiveListener : 接收的消息:这是一个发布消息,使用的chanel:channel_01
网友评论