美文网首页
Spring Boot 基于Redis 实现消息发布与订阅

Spring Boot 基于Redis 实现消息发布与订阅

作者: 破地瓜 | 来源:发表于2021-03-17 01:33 被阅读0次

    消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现

    基于 spring-boot-starter-data-redis 可以很方便的实现

    • 首先引入依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    
    • 配置主题订阅,监听redis-topic-0redis-topic-0两个topic
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
                                                       MessageListenerAdapter listenerAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            List<PatternTopic> topics = Arrays.asList(
                    PatternTopic.of("redis-topic-0"),
                    PatternTopic.of("redis-topic-1")
            );
            container.addMessageListener(listenerAdapter, topics);
            return container;
        }
    
    • 编写消息监听类
    @Service
    public class RedisReceiver {
        public void handleMessage(String message) {
            System.out.println("message:" + message);
        }
    
        public void handleMessage(String message, String topic) {
            System.out.println("message:" + message + " ,topic: " + topic);
        }
    }
    
    • 配置消息监听的Adapter
    @Bean
        public MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver, Jackson2JsonRedisSerializer<Object> valueRedisSerializer) {
            MessageListenerAdapter adapter = new MessageListenerAdapter(redisReceiver);
            return adapter;
        }
    

    消息处理默认处理handleMessage,此方法有两种,一种只有一个参数,参数为消息体,一种有两个参数,参数为消息体和topic注:只会调用先声明的方法,后面声明的不会被调用
    MessageListenerAdapter 可以指定方法名,使用new MessageListenerAdapter(redisReceiver,"methodName");即可

    • 测试
    @RestController
    public class TestRestController {
        private final RedisTemplate redisTemplate;
    
        public TestRestController(RedisTemplate redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
        @GetMapping("/test")
        public void test(){
            redisTemplate.convertAndSend("redis-topic-0","hello redis-0");
            redisTemplate.convertAndSend("redis-topic-1","hello redis-1");
        }
    }
    

    输出结果

    message:"hello redis-0"
    message:"hello redis-1"
    

    这样只能接受一些字符串,不能传递对象。对象需要配置序列化与反序列化,
    新增与调整以下配置

        /**
         * 调整RedisTemplate 的key和value的序列化方式
         * @param redisConnectionFactory
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> template = new RedisTemplate();
            RedisSerializer<String> stringSerializer = new StringRedisSerializer(StandardCharsets.UTF_8);
            template.setKeySerializer(stringSerializer);
            template.setConnectionFactory(redisConnectionFactory);
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = valueRedisSerializer();
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        /**
         * 指定value的序列化方式
         * @return
         */
        @Bean
        public Jackson2JsonRedisSerializer<Object> valueRedisSerializer() {
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            return jackson2JsonRedisSerializer;
        }
    
        /**
         * 给消息监听设置反序列化
         * @param redisReceiver
         * @param valueRedisSerializer
         * @return
         */
        @Bean
        public MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver, Jackson2JsonRedisSerializer<Object> valueRedisSerializer) {
            MessageListenerAdapter adapter = new MessageListenerAdapter(redisReceiver);
            adapter.setSerializer(valueRedisSerializer);
            return adapter;
        }
    

    将消息发送与接收调整为对象

    public class User {
        private String id;
        private String name;
        //get set
    }
    
    @RestController
    public class TestRestController {
        private final RedisTemplate redisTemplate;
    
        public TestRestController(RedisTemplate redisTemplate) {
            this.redisTemplate = redisTemplate;
        }
        @GetMapping("/test")
        public void test(){
            redisTemplate.convertAndSend("redis-topic-0",new User("1","Tom"));
            redisTemplate.convertAndSend("redis-topic-1",new User("2","Lucy"));
        }
    }
    
    @Service
    public class RedisReceiver {
        public void handleMessage(User user, String topic) {
            System.out.println("user:" + user + " ,topic: " + topic);
        }
    }
    

    日志

    user:User{id='2', name='Lucy'} ,topic: redis-topic-1
    user:User{id='1', name='Tom'} ,topic: redis-topic-0
    

    项目github地址

    相关文章

      网友评论

          本文标题:Spring Boot 基于Redis 实现消息发布与订阅

          本文链接:https://www.haomeiwen.com/subject/tmbjcltx.html