最近无聊,写了篇关于redis的发布订阅相关的文章
类似消息队列,只是没有ack
接触到的角色有
1:消息监听的容器
- 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,
- 该消息监听器 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
2:消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
3:消息的接收者
1:监听的容器代码
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param userListenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(JedisConnectionFactory connectionFactory,
MessageListenerAdapter userListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(userListenerAdapter, new PatternTopic("user"));
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter userListenerAdapter(UserReceiver receiver) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,
// 利用反射的方法调用“receiveMessage”
//也有好几个重载方法,
// 这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
return new MessageListenerAdapter(receiver, "receiveMessage");
}
/**
* 具体的接受者
* @return
*/
@Bean
public UserReceiver userReceiver() {
return new UserReceiver();
}
2: 消息发布者
@Component
public class Publisher {
@Autowired
RedisTemplate redisTemplate;
public void pushMessage(String topic, Message message) {
redisTemplate.convertAndSend(topic,message);
}
}
3:消息的接收者
@Slf4j
public class UserReceiver extends AbstractReceiver{
private Logger logger= LoggerFactory.getLogger(this.getClass());
@Override
public void receiveMessage(Object message) {
System.out.println("=====================================");
System.out.println("接收到商品消息:"+JSON.toJSONString(message));
// logger.info("接收到商品消息:{}", JSON.toJSONString(message));
}
}
4: 测试发送
public class RedisTest {
@Autowired
Publisher publisher;
@Test
public void testSend(){
Message msg=new Message();
msg.setMsg("test001");
msg.setName("redis");
publisher.pushMessage("user", msg);
}
}
启动程序,启动测试类。OK
网友评论