美文网首页redis
Redis发布订阅模式

Redis发布订阅模式

作者: Geroge1226 | 来源:发表于2021-06-24 15:18 被阅读0次

    1、说明

    Redis自身提供了发布/订阅(publish/subscribe)模式。实现方式大致流程如下图:

    image.png
    发布订阅三个角色:发布者,订阅者和Channel。

    2、redis发布订阅命令

    • 发布者使用命令publish + channel + msg
    127.0.0.1:6379[1]> publish channel01 "important things"
    (integer) 0
    127.0.0.1:6379[1]> publish channel01 "important things"
    (integer) 1
    127.0.0.1:6379[1]> publish channel01 "hello girl"
    (integer) 2
    

    注意:发布返回的是订阅者数量,发布的消息不会持久化,没有订阅者时候,发布消息会丢失,当在发布消息之后对channel进行订阅不会收到之前发布的消息。

    • 订阅者使用命令subscribe + channel
    127.0.0.1:6379[1]> subscribe channel01
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "channel01"
    3) (integer) 1
    1) "message"
    2) "channel01"
    3) "important things"
    1) "message"
    2) "channel01"
    3) "hello somebody"
    
    

    使用subscribe客户端进入订阅状态,该状态下不能使用与“发布/订阅”无关的其他命令。

    3、RedisTemplate 实现发布订阅

    • 发布者,使用convertAndSend(channel , message)方法实现消息发布
    @RequestMapping("/pubSub")
    @RestController
    public class RedisComprehensive {
    
        private static final String CHANNEL = "channel_01";
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @GetMapping(value = "/publishMessage")
        public void publishMessage(String message){
            // 发布者
            redisTemplate.convertAndSend(CHANNEL,message);
        }
    }
    
    • 订阅者分析

    (1)配置redis定义消息容器RedisMessageListenerContainer。
    addMessageListener(MessageListenerAdapter,PatternTopic): 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
    setTopicSerializer(RedisSerializer) :对频道内容进行序列化解析

    (2)配置适配器MessageListenerAdapter
    MessageListenerAdapter(Object delegate, String defaultListenerMethod)
    delegate消息订阅者类,defaultListenerMethod消息处理方法

    【实现代码如下】:

    @Configuration
    public class RedisConfig {
    
        private static final String CHANNEL = "channel_01";
         /**
         * @name: container
         * @description: Redis订阅消息监听容器
         * @param connectionFactory
         * @param adapter
         * @return: org.springframework.data.redis.listener.RedisMessageListenerContainer
        */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter adapter){
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            // 可以添加多个MessageListener
            container.addMessageListener(adapter,new PatternTopic(CHANNEL));
            return container;
        }
    
        /**
         * @name: listenerAdapter
         * @description: 配置消息处理适配器
         * @param msgReceiveListener
         * @return: org.springframework.data.redis.listener.adapter.MessageListenerAdapter
         *
        */
        @Bean
        public MessageListenerAdapter listenerAdapter(MsgReceiveListener msgReceiveListener){
             // messageListenerAdapter 传入一个消息接受的处理器,利用反射的方式调用对应的处理方法
            return new MessageListenerAdapter(msgReceiveListener,"onMessage");
        }
    }
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.stereotype.Component;
    @Component
    @Slf4j
    public class MsgReceiveListener implements MessageListener {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
            String body = serializer.deserialize(message.getBody());
            String chanel = serializer.deserialize(message.getChannel());
            log.info("接收的消息:{},使用的chanel:{}",body,chanel);
        }
    }
    

    执行结果:

    2021-06-24 15:06:18.436  INFO 28520 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 13 ms
    2021-06-24 15:06:18.540  INFO 28520 --- [    container-2] c.l.r.listener.MsgReceiveListener        : 接收的消息:这是一个发布消息,使用的chanel:channel_01
    

    相关文章

      网友评论

        本文标题:Redis发布订阅模式

        本文链接:https://www.haomeiwen.com/subject/wdgwyltx.html