美文网首页
Springboot2.x 实现redis的 发布/订阅 模型

Springboot2.x 实现redis的 发布/订阅 模型

作者: 骑蚂蚁上高速_jun | 来源:发表于2020-08-27 22:20 被阅读0次

    1 . redis的基本配置 参照上一篇文章

    1. configuration 配置redis
    package cn.waimaolang.demo.configura;
    
    import cn.waimaolang.demo.command.MyMessageListenCommand;
    import cn.waimaolang.demo.service.MessageConsumerService;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    import java.io.Serializable;
    
    
    @Configuration
    public class RedisConfigura {
        /**
         * 使用基本的redis
         * @param factory
         * @return
         */
        @Bean
        public RedisTemplate<String, Serializable> redisTemplate(
                LettuceConnectionFactory factory
        ) {
            RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
            redisTemplate.setKeySerializer(new StringRedisSerializer()); // redis 序列化数据的方式
            /* 能与其他语言 相互编码缓存 value编码方式,如果与其他语言混合开发项目,需要获取相同的缓存,
            则使用此种方式编码 */
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);        
    
            redisTemplate.setConnectionFactory(factory);
            return redisTemplate;
        }
    
        /**
         * 发布/订阅
         * RedisMessageListenerContainer提供订阅消息的多路分发,这样多个订阅可以共享同一个Redis连接.
         * 需要 自行实现监听 类 MyMessageListenCommand
         * 使用 名为  tenmao.blog.channel  作为消息管道
         */
        @Bean
        public RedisMessageListenerContainer redisContainer(LettuceConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(new MyMessageListenCommand(), new ChannelTopic("tenmao.blog.channel"));
            return container;
        }
    
        /**
         * 发布/订阅 实现2 ,需要调用listenerAdapter 方法
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         * 需要 自行实现 MessageConsumerService 类receiveMessage() 方法
         */
        @Bean
        public RedisMessageListenerContainer container(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter
        ) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(listenerAdapter, new PatternTopic("queue"));
            return container;
        }
    
        /**
         * @param receiver
         * @return
         */
        @Bean
        MessageListenerAdapter listenerAdapter(MessageConsumerService receiver) {
            return new MessageListenerAdapter(new MessageConsumerService(), "receiveMessage");
        }
    
    }
    
    
    1. 实现监听类 MyMessageListenCommand 作为 消费端逻辑
    package cn.waimaolang.demo.command;
    
    import lombok.NonNull;
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    
    
    @Slf4j
    public class MyMessageListenCommand  implements MessageListener {
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Override
        public void onMessage(@NonNull Message message, byte[] pattern) {
    
            logger.error("message received: {}", message);
        }
    }
    

    4 . 实现生产者

    @Autowired
    RedisTemplate redisTemplate;
    redisTemplate.convertAndSend("tenmao.blog.channel", "hello world");
    

    相关文章

      网友评论

          本文标题:Springboot2.x 实现redis的 发布/订阅 模型

          本文链接:https://www.haomeiwen.com/subject/yhnhsktx.html