美文网首页
Redis的发布订阅机制

Redis的发布订阅机制

作者: 右耳菌 | 来源:发表于2022-08-30 21:42 被阅读0次

    1. 什么是发布订阅机制 (PUB/SUB)

    除了普通的数据类型之外,Redis其实还支持一种类似消息推送的机制,也就是有一个客户端A向redis服务器发布了一条消息,而另外的客户端如果订阅了这个消息,那么就能接收到客户端A发送的消息。

    用通俗的话说就是,针对一个主题 topic(一个类比),如果某一个群体对这样的一个主题感兴趣,那么他们就会订阅这样的一个主题里边的内容,如果某个个体有一个与该主题相关的消息希望分享出去,那么他就可以向订阅了这个主题的人进行推送,而redis服务器的作用其实是一个中间人的作用,但需要注意的一点就是,这里的消息并不会被持久化,也就是如果是以前的消息,那么新订阅的个体是无法接收到数据的。


    2. 常用指令

    1. subscribe: 订阅指定channel的消息
      根据下面的内容,我们可以得到,订阅一个channel(其实可以同时订阅多个channel)成功后,一般会返回三个数据,第一个是类型,表示这是一个订阅的相关的消息,第二个是channel名称,第三个其实是指我们当前订阅的channel数量。
      而对于后续接收到的消息,同样也有三个数据,一个是表示这是一个消息也就是message,第二个则是channel的名称,第三个是具体的内容。
    127.0.0.1:6379> subscribe sms_send
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "sms_send"
    3) (integer) 1
    1) "message"
    2) "sms_send"
    3) "hello"
    
    1. publish: 发布消息到指定的channel
      返回:收到消息的客户端数量。请注意,在 Redis 集群中,只有与发布客户端连接到同一节点的客户端才会包含在计数中。
    127.0.0.1:6379> publish sms_send hello
    (integer) 1
    
    1. unsubscribe: 取消订阅,但是在命令行上的时候,其实是没办法很好的测试的,因为subscribe之后会一直处于读取消息的状态,所以大概是供代码调用的吧
    127.0.0.1:6379> unsubscribe sms_send
    1) "unsubscribe"
    2) "sms_send"
    3) (integer) 0
    
    1. pubsub channels: 查看当前的channel
    127.0.0.1:6379> pubsub channels
    1) "first"
    2) "second"
    
    1. pubsub numsub: 返回指定频道的订阅者数量(不包括订阅模式的客户端)。
    127.0.0.1:6379> pubsub numsub sms_send
    1) "sms_send"
    2) (integer) 1
    
    1. monitor: monitor是一个调试命令,它流回 Redis 服务器处理的每个命令。它可以帮助理解数据库发生了什么。该命令既可以通过 也可以通过redis-cli使用telnet。

    3. 其他的内容

    1. psubscribe: 订阅一个或多个符合给定模式的频道。其中的p表示pattern
    2. punsubscribe: 取消订阅一个或多个符合给定模式的频道。其中的P表示pattern
    3. ssubscribe: 为客户端订阅指定的分片通道。其中的s表示Shared
    4. sunsubscribe: 停止监听发布到给定分片通道的消息。其中的s表示Shared
    5. spublish: 发布消息到给定分片通道。其中的s表示Shared

    4. 如何在java中使用订阅发布机制 - 例子

      1. 修改 pom.xml
            <dependency>
                <groupId>io.lettuce</groupId>
                <artifactId>lettuce-core</artifactId>
                <version>${lettuce.version}</version>
            </dependency>
    
      1. 创建 AppConfig.java
    package cn.lazyfennec.cache.redis;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    @Configuration
    class AppConfig {
        /**
         * 用于测试的通道名称
         */
        public final static String TEST_CHANNEL_NAME = "sms_send";
    
        @Bean
        public RedisConnectionFactory redisConnectionFactory() {
            System.out.println("使用单机版本");
            return new LettuceConnectionFactory(new RedisStandaloneConfiguration("192.168.1.7", 6379));
        }
    
        @Bean
        public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate redisTemplate = new RedisTemplate();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // 可以配置对象的转换规则,比如使用json格式对object进行存储。
            // Object --> 序列化 --> 二进制流 --> redis-server存储
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
            return redisTemplate;
        }
    }
    
      1. 订阅
    • 3.1 redisTemplate.execute 的方式
    package cn.lazyfennec.cache.redis;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Profile;
    import org.springframework.dao.DataAccessException;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    /**
     * 接收短信通知,直接用客户端的方式
     */
    @Component
    public class SmsChannelListener {
        @Autowired
        RedisTemplate redisTemplate;
    
        @PostConstruct
        public void setup() {
            redisTemplate.execute(new RedisCallback() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    connection.subscribe((message, pattern) -> {
                        System.out.println("收到消息,使用redisTemplate收到的:" + message);
                    }, PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes());
                    return null;
                }
            });
        }
    }
    
    • 3.2 消息容器的方式
    package com.study.cache.redis.a3_pubsub;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.stereotype.Component;
    
    import java.util.*;
    
    /**
     * 接收短信通知
     */
    @Component
    @Configuration
    public class SmsChannelListenerBySpring {
        // 定义监听器
        @Bean
        public RedisMessageListenerContainer smsMessageListener(RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            SmsSendListener smsSendListener = new SmsSendListener();
            container.addMessageListener(smsSendListener, Arrays.asList(new ChannelTopic(PubsubRedisAppConfig.TEST_CHANNEL_NAME)));
            return container;
        }
    
        // 定义触发的方法
        class SmsSendListener implements MessageListener {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                System.out.println("借助spring容器收到消息:" + message);
            }
        }
    }
    
      1. 发布 - 例子
        @Test
        public void test1() throws InterruptedException {
            System.out.println("开始测试发布订阅机制,5秒后发布一条消息");
            Thread.sleep(5000L);
            redisTemplate.execute(new RedisCallback<Long>() {
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    // 发送通知
                    Long received = connection.publish(PubsubRedisAppConfig.TEST_CHANNEL_NAME.getBytes(), "{手机号码10086~短信内容~~}".getBytes());
                    return received;
                }
            });
        }
    

    5. Redis keyspace notifications

    实时监控 Redis 键和值的变化

    Keyspace 通知允许客户端订阅 Pub/Sub 频道,以便接收以某种方式影响 Redis 数据集的事件。

    可能接收的事件示例如下:

    • 所有影响给定键的命令。
    • 所有接收LPUSH操作的键。
    • 所有在数据库0中到期的键。

    事件使用Redis的普通发布/订阅层传递,因此实现了发布/订阅的客户端无需修改即可使用此功能。

    由于Redis的发布/订阅是fire and forget,因此如果你的应用要求可靠的事件通知,目前还不能使用这个功能,也就是说,如果你的发布/订阅客户端断开连接,并在稍后重连,那么所有在客户端断开期间发送的事件将会丢失。

    事件类型

    键空间通知的实现是为每一个影响Redis数据空间的操作发送两个不同类型的事件。例如,在数据库0中名为mykey的键上执行DEL操作,将触发两条消息的传递,完全等同于下面两个PUBLISH命令:

    PUBLISH __keyspace@0__:mykey del
    PUBLISH __keyevent@0__:del mykey
    

    以上很容易看到,一个频道允许监听所有以键mykey为目标的所有事件,以及另一个频道允许获取有关所有DEL操作目标键的信息。
    第一种事件,在频道中使用keyspace前缀的被叫做键空间通知,第二种,使用keyevent前缀的,被叫做键事件通知。
    在以上例子中,为键mykey生成了一个del事件。 会发生什么:

    • 键空间频道接收到的消息是事件的名称。
    • 键事件频道接收到的消息是键的名称。
      可以只启用其中一种通知,以便只传递我们感兴趣的事件子集。

    配置
    默认情况下,键空间事件通知是不启用的,因为虽然不太明智,但该功能会消耗一些CPU。可以使用redis.conf中的notify-keyspace-events或者使用CONFIG SET命令来开启通知。
    将参数设置为空字符串会禁用通知。 为了开启通知功能,使用了一个非空字符串,由多个字符组成,每一个字符都有其特殊的含义,具体参见下表:

    • 英文版
    参数 说明
    K Keyspace events, published with keyspace@<db> prefix.
    E Keyevent events, published with keyevent@<db> prefix.
    g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
    $ String commands
    l List commands
    s Set commands
    h Hash commands
    z Sorted set commands
    t Stream commands
    d Module key type events
    x Expired events (events generated every time a key expires)
    e Evicted events (events generated when a key is evicted for maxmemory)
    m Key miss events (events generated when a key that doesn't exist is accessed)
    n New key events (Note: not included in the 'A' class)
    A Alias for "g$lshztxed", so that the "AKE" string means all the events except "m".

    过期事件

    设置了生存时间的键由Redis以两种方式过期:

    • 当命令访问键时,发现键已过期。
    • 通过后台系统在后台逐步查找过期的键,以便能够收集那些从未被访问的键。

    当通过以上系统之一访问键且发现键已经过期时,将生成expired事件。因此无法保证Redis服务器在键过期的那一刻同时生成expired事件。

    如果没有命令不断地访问键,并且有很多键都有关联的TTL,那么在键的生存时间降至零到生成expired事件之间,将会有明显的延迟。

    基本上,expired事件是在Redis服务器删除键的时候生成的,而不是在理论上生存时间达到零值时生成的。


    如果觉得有收获,欢迎点赞和评论,更多知识,请点击关注查看我的主页信息哦~

    相关文章

      网友评论

          本文标题:Redis的发布订阅机制

          本文链接:https://www.haomeiwen.com/subject/xkidnrtx.html