美文网首页程序员Java 杂谈Java架构技术栈
设计模式之发布订阅模式(2) Redis 发布/订阅模式

设计模式之发布订阅模式(2) Redis 发布/订阅模式

作者: 若丨寒 | 来源:发表于2019-05-02 16:38 被阅读19次

    Redis 发布/订阅命令

    Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了发布订阅模式。该功能提供两种信息机制, 分别是“发布订阅到频道”和“发布订阅到模式”。

    PUBLISH 命令和 SUBSCRIBE 命令

    PUBLISH channel message
    

    Redis 的 PUBLISH 命令可以让客户端把指定的消息发送到指定的频道中。

    SUBSCRIBE channel [channel …]
    

    Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时,信息就会被发送给所有订阅指定频道的客户端。

    下面我们就演示一下 PUBLISH命令和SUBSCRIBE命令的用法:

    首先是订阅单个频道:

    然后是订阅多个频道:

    PSUBSCRIBE 模式订阅命令

    Redis 的发布与订阅实现支持模式匹配(pattern matching)。

    客户端可以订阅一个带 * 号的模式,如果某个/某些频道的名字和这个模式匹配,那么当有信息发送给这个/这些频道的时候,客户端也会收到这个/这些频道的信息。

    客户端订阅的模式里面可以包含多个 glob 风格的通配符, 比如 * 、 ? 和 [...] 等。

    比如执行命令:

    PSUBSCRIBE t.*
    

    客户端将收到来自 t.java、 t.db 等频道的信息。

    Redis 发布/订阅的存储结构

    每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息:

    struct redisServer {
        // ...
        dict *pubsub_channels;
        // ...
    }
    

    其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。

    当调用 PUBLISH channel message 命令的时候,程序首先根据 channel 定位到字典的键,然后将信息发送给字典值链表中的所有客户端。

    Redis发布/订阅存储结构如下图所示:

    Spring Data Redis 实现发布/订阅模式

    下面带你一步步通过 Spring Data Redis 来实现发布与订阅。

    由于篇幅原因下面就不再演示项目搭建和集成Redis的过程了

    MessagePublisher

    首先定义一个发布者接口,接口只有一个void publish(String message)方法,用于发布消息。

    public interface MessagePublisher {
        /**
         * publish message
         * @param message
         */
        void publish(String message);
    }
    

    然后提供一个基于Redis的MessagePublisher实现。

    其中最核心的是这个方法:redisTemplate.convertAndSend(topic.getTopic(), message),用于把消息发送到指定topic的channel之中。

    import lombok.Setter;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    
    /**
     * Redis message publisher
     *
     * @author ijiangtao
     * @create 2019-05-01 19:36
     **/
    @Setter
    public class RedisMessagePublisher implements MessagePublisher {
    
        private RedisTemplate<String, String> redisTemplate;
    
        private ChannelTopic topic;
    
        private RedisMessagePublisher() { }
    
        public RedisMessagePublisher(RedisTemplate<String, String> redisTemplate, ChannelTopic topic) {
            this.redisTemplate = redisTemplate;
            this.topic = topic;
        }
    
        public void publish(String message) {
            redisTemplate.convertAndSend(topic.getTopic(), message);
        }
    }
    

    MessageListener

    RedisMessageSubscriber是一个订阅者,它实现了MessageListener接口,并通过一个messageList来存/取监听到的消息。

    
    import lombok.AccessLevel;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.stereotype.Component;
    import java.util.List;
    
    /**
     * Redis Message Subscriber
     * <p>
     * RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface
     *
     * @author ijiangtao
     * @create 2019-05-01 19:39
     **/
    @AllArgsConstructor
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    @Data
    @Component
    public class RedisMessageSubscriber implements MessageListener {
    
        private List<String> messageList;
    
        public void onMessage(Message message, byte[] pattern) {
            messageList.add("[pattern:" + new String(pattern) + ",message:" + message.toString() + "]");
        }
    }
    

    RedisPubSubConfig

    下面定义了两个“topic”,并且通过两个“publisher`将“message”发布到“channel”指定的“topic”上。

    然后我们定义了两个“subscriber”,“subscriber1”订阅了“topic1”和“topic2”,“subscriber2”只订阅了“topic2”。

    最后我们将这些发布者和订阅者都注册到了 Spring Data Redis 提供的容器(RedisMessageListenerContainer)中。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
    
    import java.util.ArrayList;
    
    /**
     * config
     *
     * @author ijiangtao
     * @create 2019-05-01 19:57
     **/
    @Configuration
    @ComponentScan("net.ijiangtao.tech.framework.spring.ispringboot.redis")
    @EnableRedisRepositories(basePackages = "net.ijiangtao.tech.framework.spring.ispringboot")
    @PropertySource("classpath:application.properties")
    public class RedisPubSubConfig {
    
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
    
        @Bean
        RedisMessageListenerContainer redisContainer() {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    
            container.setConnectionFactory(redisTemplate.getConnectionFactory());
    
            container.addMessageListener(messageListenerAdapter1() , topic1());
            container.addMessageListener(messageListenerAdapter1() , topic2());
    
            container.addMessageListener(messageListenerAdapter2(), topic2());
    
            return container;
        }
    
        @Bean
        MessageListenerAdapter messageListenerAdapter1() {
            return new MessageListenerAdapter(messageListener1());
        }
    
        @Bean
        public RedisMessageSubscriber messageListener1() {
            return new RedisMessageSubscriber(new ArrayList<>());
        }
    
        @Bean
        MessageListenerAdapter messageListenerAdapter2() {
            return new MessageListenerAdapter(messageListener2());
        }
    
        @Bean
        public RedisMessageSubscriber messageListener2() {
            return new RedisMessageSubscriber(new ArrayList<>());
        }
    
        @Bean
        MessagePublisher redisPublisherForTopic1() {
            return new RedisMessagePublisher(redisTemplate, topic1());
        }
    
        @Bean
        MessagePublisher redisPublisherForTopic2() {
            return new RedisMessagePublisher(redisTemplate, topic2());
        }
    
        @Bean
        ChannelTopic topic1() {
            return new ChannelTopic("topic1");
        }
    
        @Bean
        ChannelTopic topic2() {
            return new ChannelTopic("topic2");
        }
    
    }
    

    Unit Test

    下面我们通过单元测试,往“topic1”和“topic2”分别发布了十条消息,然后遍历“subscriber1”和“subscriber2”监听到的消息内容。

    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.List;
    import java.util.UUID;
    
    /**
     * Redis Pub/Sub tests
     *
     * @author ijiangtao
     * @create 2019-05-01 19:12
     **/
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @Slf4j
    public class RedisPubSub {
    
        @Autowired
        @Qualifier("redisPublisherForTopic1")
        private MessagePublisher redisPublisher1;
    
        @Autowired
        @Qualifier("redisPublisherForTopic2")
        private MessagePublisher redisPublisher2;
    
        @Autowired
        @Qualifier("messageListener1")
        private RedisMessageSubscriber subscriber1;
    
        @Autowired
        @Qualifier("messageListener2")
        private RedisMessageSubscriber subscriber2;
    
        @Test
        public void test1() {
    
            // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend
            for (int i = 0; i < 10; i++) {
                String message = "Topic1 Message : " + UUID.randomUUID();
                redisPublisher1.publish(message);
            }
    
            // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend
            for (int i = 0; i < 10; i++) {
                String message = "Topic2 Message : " + UUID.randomUUID();
                redisPublisher2.publish(message);
            }
    
            // 获取存储的订阅消息
            List<String> messageList1 = subscriber1.getMessageList();
            for (int i = 0; i < messageList1.size(); i++) {
                log.info(messageList1.get(i));
            }
    
            // 获取存储的订阅消息
            List<String> messageList2 = subscriber2.getMessageList();
            for (int i = 0; i < messageList2.size(); i++) {
                log.info(messageList2.get(i));
            }
    
        }
    
    }
    

    “subscriber1”监听到了“redisPublisher1”和“redisPublisher2”发布的共20条消息:

    [pattern:topic1,message:Topic1 Message : 2239af04-8e91-4adf-8e1e-98261a44ff77]
    [pattern:topic1,message:Topic1 Message : 85107f06-2cae-4d6c-8123-9e8dc6e7a608]
    [pattern:topic1,message:Topic1 Message : 0b80b9b8-8eee-476e-8462-bb6cbbbcf863]
    [pattern:topic1,message:Topic1 Message : 0983f28d-d220-4538-b15e-dc66c0d3e491]
    [pattern:topic1,message:Topic1 Message : 0f2d863c-00b9-4406-8e49-020c78a3632d]
    [pattern:topic1,message:Topic1 Message : b8a0bb35-6cc2-4393-9136-2390de80f709]
    [pattern:topic1,message:Topic1 Message : 027f1ca5-39cc-42c6-a4d8-87dc138260b1]
    [pattern:topic1,message:Topic1 Message : ff85595e-2864-4dec-96c1-9dd29c69f670]
    [pattern:topic1,message:Topic1 Message : 77471855-f04b-437d-bd1b-afb801a33cf9]
    [pattern:topic1,message:Topic1 Message : feba4b0f-70c1-4c14-8ecb-bf4c6956f374]
    [pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
    [pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
    [pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
    [pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
    [pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
    [pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
    [pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
    [pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
    [pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
    [pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
    

    “subscriber2”监听到了“redisPublisher2”发布的共10条消息:

    [pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
    [pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
    [pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
    [pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
    [pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
    [pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
    [pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
    [pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
    [pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
    [pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
    

    总结

    本文从 Redis 发布和订阅相关的命令开始,逐步讲解了 Redis 发布订阅的存储结构,以及如何通过 Spring Data Redis 实现发布订阅模式。

    作者:西召
    链接:https://juejin.im/post/5cc9d2d2e51d453ae54a2040

    相关文章

      网友评论

        本文标题:设计模式之发布订阅模式(2) Redis 发布/订阅模式

        本文链接:https://www.haomeiwen.com/subject/mninnqtx.html