美文网首页
Redis-发布订阅

Redis-发布订阅

作者: zzj0990 | 来源:发表于2021-01-24 22:36 被阅读0次

    简介

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。适宜做在线聊天、消息推送等。

    发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息,Redis 客户端可以订阅任意数量的频道。

    屏幕快照 2021-01-24 下午9.21.38.png
    上图展示了频道 channel1 ,以及订阅这个频道的3个客户端 - client2/client5/lient1 之间的关系
    屏幕快照 2021-01-24 下午9.24.13.png

    当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的3个客户端(client2/client5/lient1)

    发布订阅

    1. 发布消息
      Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。
    C:\Users\Administrator>redis-cli
    127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
    (integer) 1
    127.0.0.1:6379> PUBLISH redisChat "Learn redis"
    (integer) 1
    127.0.0.1:6379>
    
    1. 订阅某个频道
      Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。
    C:\Users\Administrator>redis-cli
    127.0.0.1:6379> SUBSCRIBE redisChat
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "redisChat"
    3) (integer) 1
    
    屏幕快照 2021-01-24 下午9.33.07.png

    常用命令

    1. Redis Psubscribe 命令
      Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。
      每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。

    redis Psubscribe 命令基本语法如下:

    redis 127.0.0.1:6379> PSUBSCRIBE pattern [pattern ...]
    
    1. Redis Pubsub 命令
      Redis Pubsub 命令用于查看订阅与发布系统状态,它由数个不同格式的子命令组成。

    redis Pubsub 命令基本语法如下:

    redis 127.0.0.1:6379> PUBSUB <subcommand> [argument [argument ...]]
    

    返回值
    由活跃频道组成的列表。
    例:

    127.0.0.1:6379> PUBSUB CHANNELS
    1) "redisChat"
    
    1. Redis Publish 命令
      Redis Publish 命令用于将信息发送到指定的频道。

    redis Publish 命令基本语法如下:

    redis 127.0.0.1:6379> PUBLISH channel message
    

    返回值
    接收到信息的订阅者数量。

    127.0.0.1:6379> PUBLISH mychannel "hello, i m here"
    (integer) 0
    
    1. Redis Punsubscribe 命令
      Redis Punsubscribe 命令用于退订所有给定模式的频道。

    redis Punsubscribe 命令基本语法如下:

    redis 127.0.0.1:6379> PUNSUBSCRIBE [pattern [pattern ...]]
    

    返回值
    这个命令在不同的客户端中有不同的表现。

    127.0.0.1:6379> PUNSUBSCRIBE redisChat
    1) "punsubscribe"
    2) "redisChat"
    3) (integer) 0
    
    1. Redis Subscribe 命令
      Redis Subscribe 命令用于订阅给定的一个或多个频道的信息。。

    redis Subscribe 命令基本语法如下:

    redis 127.0.0.1:6379> SUBSCRIBE channel [channel ...]
    
    1. Redis Unsubscribe 命令
      Redis Unsubscribe 命令用于退订给定的一个或多个频道的信息。
      指示客户端退订给定的频道。
      如果没有频道被指定,也即是,一个无参数的 <tt class="docutils literal" style="background-color: transparent; font-size: 1.1em; font-family: monospace;">UNSUBSCRIBE</tt> 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。

    redis Unsubscribe 命令基本语法如下:

    redis 127.0.0.1:6379> UNSUBSCRIBE channel [channel ...]
    
    127.0.0.1:6379> UNSUBSCRIBE redisChat
    1) "unsubscribe"
    2) "redisChat"
    3) (integer) 0
    

    返回值
    这个命令在不同的客户端中有不同的表现。

    代码示例

    可以通过spring-redis中的redisTemplate工具辅助实现:

    1. 发布消息
    /**
     * redis发布消息
     *
     * @param channel
     * @param message
     */
    public void sendMessage(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
    
    1. 监听消息
      监听消息需要两步,消息监听类并在xml中注册这个类
      监听类有两种实现方式一种是实现org.springframework.data.redis.connection.MessageListener接口,实现onMessage方法示例代码如下:
    @Component
    public class RedisMessageListener implements MessageListener {
    
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
    
        private static Logger logger = Logger.getLogger(RedisMessageListener.class);
        @Override
        public void onMessage(Message message, byte[] pattern) {
            byte[] body = message.getBody(); // 请使用valueSerializer
            byte[] channel = message.getChannel();
            /*
               请参考配置文件,本例中key,value的序列化方式均为string。
               其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
             */
            String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body);
            String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);
            logger.info("redis--topic:" + topic + "  body:" + msgContent);
        }
    }
    

    可以使用自己定义的类,方法名称自己定义,示例如下:

    @Component
    public class EventListener {
        private static Logger logger = Logger.getLogger(EventListener.class);
        public void getMessage(String message, String channel) {
            logger.info(message);
        }
    } 
    

    这两中方式实现的不同在于注册监听器时的配置略有不同

    1. redis配置
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:p="http://www.springframework.org/schema/p" xmlns:redis="http://www.springframework.org/schema/redis"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd                           http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
    
    <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxIdle" value="${redis.maxIdle}" />
        <!-- <property name="maxActive" value="${redis.maxActive}" /> -->
        <property name="maxWaitMillis" value="${redis.maxWaitMillis}" />
        <property name="testOnBorrow" value="${redis.testOnBorrow}" />
    </bean>
    <!--注意使用订阅发布时,此bean必须命名为redisConnectionFactory,否则需要在listener中指明连接工厂-->
    <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
          p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pass}"
          p:pool-config-ref="poolConfig"/>
    
    <bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
        <property name="connectionFactory" ref="redisConnectionFactory"/>
    </bean>
    
    <bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    <!--此处注册监听器,需要指定通道名称(topic)(可以使用正则表达式*_等等),第一种为实现MessageListener接口的监听器的注册,第二种为自己定义的类的注册需要制定处理方法名称(不制定的默认方法为handleMessage,如果你的方法是这个名称可以不指定)与序列化的方式,推荐使用第一种方式-->
    <redis:listener-container>
        <redis:listener ref="redisMessageListener" topic="talk"/>
        <redis:listener ref="eventListener" topic="talk*" method="getMessage"
                        serializer="stringRedisSerializer"></redis:listener>
    </redis:listener-container>
    </beans>
    

    实际案例分析

    场景是这样的:有个发布消息的终端,同时订阅该通道的有多个终端,由于Redis发布订阅模式不能实现查看历史消息(3d内消息)且后来订阅该通道的终端也看不到历史消息。
    顾可以以下草图解决方案:

    屏幕快照 2021-01-24 下午10.07.30.png
    客户端A为订阅端: 历史消息分为两种,一种更老的数据放入数据库,另外一种是3d内的消息,放入sorted_set中(使用该结构的特性:无重复且可以排序,以时间为滑动窗口来实现);实时性的消息从通道中获取即可;
    B为发布消息端: 直接发布消息到通道,同时往sorted_set结构中发一份,和kafka中放一份。但仔细一想该方案有些不妥,如果B往通道发送完之后,自己挂了呢?kafka和sorted_set中都没发送数据,顾历史消息订阅者无法看到。
    请看下图如何解决上述问题的?是不是一目了然了,不用我多说了吧,呵呵!
    屏幕快照 2021-01-24 下午10.33.55.png

    ————————————————————
    坐标帝都,白天上班族,晚上是知识的分享者
    如果读完觉得有收获的话,欢迎点赞加关注

    相关文章

      网友评论

          本文标题:Redis-发布订阅

          本文链接:https://www.haomeiwen.com/subject/vchmzktx.html