美文网首页
Redis实现不可靠发布/订阅功能

Redis实现不可靠发布/订阅功能

作者: r09er | 来源:发表于2020-03-18 16:51 被阅读0次

    Redis的发布/订阅模型

    Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式, 本文讨论订阅/发布到频道的实现

    该种模型类似于RocketMQ中广播模式,消费者订阅topic

    如图展示了发布消息到channel1后,各个client都会接收到message

    image.png

    虽然Redis能够实现发布/订阅的功能,但是有如下缺点,所以选用前需谨慎考虑

    • 1.消息无法持久化,存在丢失风险

    和常规的MQ不同,redis实现的发布/订阅模型消息无法持久化,一经发布,即使没有任何订阅方处理,该条消息就会丢失

    • 2.没有类似ACK的机制

    即发布方不会确保订阅方成功接收

    • 3.广播机制,下游消费能力取决于消费方本身

    广播机制无法通过添加多个消费方增强消费能力,因为这和发布/订阅模型本身的目的是不符的.广播机制的目的是一个一个发布者被多个订阅进行不同的处理

    Redis发布/订阅应用场景

    由于Redis发布/订阅模型存在的缺陷,所以使用前需要考虑如下几点

    • 1.对于消息处理可靠性要求不强
    • 2.消费能力无需通过增加消费方进行增强
      考虑如上两点后,可以想到的场景有如下
    • 1.用户注册后,发送相关优惠信息
    • 2.用户修改名称,由于有业务表对用户名称进行了字段冗余,通过订阅修改名称的channel,触发各个业务表的字段修改

    具体使用还是需要考虑业务场景需求

    SpringBoot使用Redis的发布/订阅功能

    在目前SpringBoot使用Redis的操作中,官方推荐使用SpringData模块中的spring-data-redis,所以下文会以spring-data-redis进行

    下文需要对Springboot工程有一定的基础认识

    1.Maven依赖redis组件

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
    </dependency>
    

    2.redis序列化相关配置

    序列化使用的是GenericJackson2JsonRedisSerializer,使用这个类可以正确序列化Null的对象.如果使用Jackson2JsonRedisSerializer,会将对象序列号成空数组.

    @Configuration
    public class RedisConfig {
    
        @Bean
        public RedisTemplate<Object, Object> redisTemplate(RedisSerializer<Object> redisSerializer, RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<Object, Object> template = new RedisTemplate<>();
            template.setConnectionFactory(redisConnectionFactory);
            template.setKeySerializer(RedisSerializer.string());
            template.setDefaultSerializer(redisSerializer);
            return template;
        }
    
        @Bean
        public RedisSerializer<Object> redisSerializer(){
            GenericJackson2JsonRedisSerializer redisSerializer = new GenericJackson2JsonRedisSerializer();
            return redisSerializer;
        }
    }
    
    

    3.配置发布方法

    简单起见,在这里使用SpringSchedule,周期性发布消息

    @EnableScheduling
    @Component
    public class RedisPublisher {
    
        private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);
    
        @Autowired
        private RedisTemplate<Object, Object> redisTemplate;
    
        private AtomicInteger incrInteger = new AtomicInteger();
    
    
        @Scheduled(initialDelay = 500, fixedDelay = 10000)
        public void publish() {
            int incrementAndGet = incrInteger.incrementAndGet();
            String topic = "redis/test";
            String message = "current num : " + incrementAndGet;
            log.info("发布消息..topic:{},内容:{}", topic, message);
            redisTemplate.convertAndSend(topic, message);
        }
    }
    

    4.订阅配置

    在订阅程序中,有两个比较重要的类,分别是MessageListenerAdapterRedisMessageListenerContainer
    其中MessageListenerAdapter实现MessageListener作用是将自定义的消费类进行适配.这个类必须有一个public的消费方法,并且方法需要有两个参数,arg1为channel,arg2是Message.原因可以在MessageListenerAdapter源码中发现

    4.1MessageListenerAdapter和一个自定义消费类

    MessageListenerAdapter.onMessage方法中,通过反射对消费类进行了方法调用,并且方法的参数和顺序进行了硬编码,所以必须在消费类中提供一个public方法

    image.png

    4.2RedisMessageListenerContainer

    从官方文档中,可以得知RedisMessageListenerContainer的作用是用于接收消息后进行分发,并且通过内部的线程池进行异步分发,(也可以使用自定义的线程池和相关失败策略)

    4.3完整订阅配置

    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public MessageListenerAdapter processorOne(RedisSerializer<Object> serializer, RedisConsumer redisConsumer) {
    
            MessageListenerAdapter adapter = new MessageListenerAdapter(redisConsumer, "onMessage");
            adapter.setSerializer(serializer);
            return adapter;
        }
    
        /**
         * 
         * 支持动态添加监听
         *
         * @param adapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                                      MessageListenerAdapter adapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            //制定topic的序列化方式,String
            container.setTopicSerializer(RedisSerializer.string());
            //添加监听
            container.addMessageListener(adapter, new PatternTopic("redis/**"));
            return container;
        }
    
    }
    

    启动发布者和订阅者,查看日志

    启动一个发布者,两个订阅者

    由于先启动的发布者,所以部分已经发布的消息,会直接被丢弃,这也是Redis发布订阅模型的一个缺点

    发布者日志

    image.png

    订阅者1日志

    image.png

    订阅者2日志

    image.png

    相关文章

      网友评论

          本文标题:Redis实现不可靠发布/订阅功能

          本文链接:https://www.haomeiwen.com/subject/whdcyhtx.html