Redis的Pub/Sub(发布/订阅)

作者: 激情的狼王 | 来源:发表于2018-02-23 17:01 被阅读6887次

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。redis内置了发布/订阅功能,可以作为消息机制使用。在使用Jedis的Publish/Subscribe功能之前,我们先来看原生版的发布订阅命令。

    原生版Redis 发布订阅命令

    1.SUBSCRIBE channel [channel ...]

        订阅给定的一个或多个频道的信息。
      时间复杂度:
        O(N),其中 N 是订阅的频道的数量。
      返回值:
        接收到的信息(请参见下面的代码说明)。

    # 订阅 msg 和 chat_room 两个频道
    
    # 1 - 6 行是执行 subscribe 之后的反馈信息
    # 第 7 - 9 行才是接收到的第一条信息
    # 第 10 - 12 行是第二条
    
    redis> subscribe msg chat_room
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"       # 返回值的类型:显示订阅成功
    2) "msg"             # 订阅的频道名字
    3) (integer) 1       # 目前已订阅的频道数量
    
    4) "subscribe"
    5) "chat_room"
    6) (integer) 2
    
    7) "message"         # 返回值的类型:信息
    8) "msg"             # 来源(从那个频道发送过来)
    9) "hello moto"      # 信息内容
    
    10) "message"
    11) "chat_room"
    12) "testing...haha"
    
    2.PSUBSCRIBE pattern [pattern ...]

        订阅一个或多个符合给定模式的频道。每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
    时间复杂度:
        O(N), N 是订阅的模式的数量。
    返回值:
        接收到的信息(请参见下面的代码说明)。

    # 订阅 news.* 和 tweet.* 两个模式
    
    # 第 1 - 6 行是执行 psubscribe 之后的反馈信息
    # 第 7 - 10 才是接收到的第一条信息
    # 第 11 - 14 是第二条
    # 以此类推。。。
    
    redis> psubscribe news.* tweet.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"                  # 返回值的类型:显示订阅成功
    2) "news.*"                      # 订阅的模式
    3) (integer) 1                   # 目前已订阅的模式的数量
    
    4) "psubscribe"
    5) "tweet.*"
    6) (integer) 2
    
    7) "pmessage"                    # 返回值的类型:信息
    8) "news.*"                      # 信息匹配的模式
    9) "news.it"                     # 信息本身的目标频道
    10) "Google buy Motorola"         # 信息的内容
    
    11) "pmessage"
    12) "tweet.*"
    13) "tweet.huangz"
    14) "hello"
    
    15) "pmessage"
    16) "tweet.*"
    17) "tweet.joe"
    18) "@huangz morning"
    
    19) "pmessage"
    20) "news.*"
    21) "news.life"
    22) "An apple a day, keep doctors away"
    
    3.UNSUBSCRIBE [channel [channel ...]]

        指示客户端退订给定的频道。如果没有频道被指定,即一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用SUBSCRIBE命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
    时间复杂度:
        O(N) , N 是客户端已订阅的频道的数量。
    返回值:
        这个命令在不同的客户端中有不同的表现

    4.PUNSUBSCRIBE [pattern [pattern ...]]

        指示客户端退订所有给定模式。如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE调用被执行,那么客户端使用 PSUBSCRIBE命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
    时间复杂度:
        O(N+M) ,其中 N是客户端已订阅的模式的数量, M则是系统中所有客户端订阅的模式的数量。
    返回值:
        这个命令在不同的客户端中有不同的表现。

    5.PUBSUB CHANNELS [pattern]

    列出当前的活跃频道。活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。
    pattern 参数是可选的:
    如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
    如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。
    复杂度: O(N) ,N 为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常数)。
    返回值: 一个由活跃频道组成的列表。

    # client-1 订阅 news.it 和 news.sport 两个频道
    
    client-1> SUBSCRIBE news.it news.sport
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "news.it"
    3) (integer) 1
    1) "subscribe"
    2) "news.sport"
    3) (integer) 2
    
    # client-2 订阅 news.it 和 news.internet 两个频道
    
    client-2> SUBSCRIBE news.it news.internet
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "news.it"
    3) (integer) 1
    1) "subscribe"
    2) "news.internet"
    3) (integer) 2
    
    # 首先, client-3 打印所有活跃频道
    # 注意,即使一个频道有多个订阅者,它也只输出一次,比如 news.it
    
    client-3> PUBSUB CHANNELS
    1) "news.sport"
    2) "news.internet"
    3) "news.it"
    
    # 接下来, client-3 打印那些与模式 news.i* 相匹配的活跃频道
    # 因为 news.sport 不匹配 news.i* ,所以它没有被打印
    
    redis> PUBSUB CHANNELS news.i*
    1) "news.internet"
    2) "news.it"
    
    6.PUBSUB NUMSUB [channel-1 ... channel-N]

    返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。
    复杂度: O(N) , N 为给定频道的数量。
    返回值:一个多条批量回复(Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。 格式为:频道 channel-1 , channel-1 的订阅者数量,频道 channel-2 , channel-2 的订阅者数量,诸如此类。 回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。 不给定任何频道而直接调用这个命令也是可以的, 在这种情况下, 命令只返回一个空列表。

    # client-1 订阅 news.it 和 news.sport 两个频道
    
    client-1> SUBSCRIBE news.it news.sport
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "news.it"
    3) (integer) 1
    1) "subscribe"
    2) "news.sport"
    3) (integer) 2
    
    # client-2 订阅 news.it 和 news.internet 两个频道
    
    client-2> SUBSCRIBE news.it news.internet
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "news.it"
    3) (integer) 1
    1) "subscribe"
    2) "news.internet"
    3) (integer) 2
    
    # client-3 打印各个频道的订阅者数量
    
    client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music
    1) "news.it"    # 频道
    2) "2"          # 订阅该频道的客户端数量
    3) "news.internet"
    4) "1"
    5) "news.sport"
    6) "1"
    7) "news.music" # 没有任何订阅者
    8) "0"
    
    7.PUBSUB NUMPAT

    返回订阅模式的数量。
    注意,这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。
    复杂度: O(1) 。
    返回值: 一个整数回复(Integer reply)。

    # client-1 订阅 news.* 和 discount.* 两个模式
    
    client-1> PSUBSCRIBE news.* discount.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "news.*"
    3) (integer) 1
    1) "psubscribe"
    2) "discount.*"
    3) (integer) 2
    
    # client-2 订阅 tweet.* 一个模式
    
    client-2> PSUBSCRIBE tweet.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "tweet.*"
    3) (integer) 1
    
    # client-3 返回当前订阅模式的数量为 3
    
    client-3> PUBSUB NUMPAT
    (integer) 3
    
    # 注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在 PUBSUB NUMPAT 之内
    # 比如说,再新建一个客户端 client-4 ,让它也订阅 news.* 频道
    
    client-4> PSUBSCRIBE news.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "news.*"
    3) (integer) 1
    
    # 这时再计算被订阅模式的数量,就会得到数量为 4
    
    client-3> PUBSUB NUMPAT
    (integer) 4
    

    Jedis发布订阅命令

    要使用JedisPublish/Subscribe功能,必须编写对JedisPubSub的自己的实现,其中的函数的功能如下:

    public class PubSubListener extends JedisPubSub{
    
        // 取得订阅的消息后的处理
        public void onMessage(String channel, String message) {
            //TODO:接收订阅频道消息后,业务处理逻辑
            System.out.println(channel + "=" + message);
        }
    
        // 初始化订阅时候的处理
        public void onSubscribe(String channel, int subscribedChannels) {
             System.out.println(channel + "=" + subscribedChannels);
        }
    
        // 取消订阅时候的处理
        public void onUnsubscribe(String channel, int subscribedChannels) {
             System.out.println(channel + "=" + subscribedChannels);
        }
    
        // 初始化按表达式的方式订阅时候的处理
        public void onPSubscribe(String pattern, int subscribedChannels) {
             System.out.println(pattern + "=" + subscribedChannels);
        }
    
        // 取消按表达式的方式订阅时候的处理
        public void onPUnsubscribe(String pattern, int subscribedChannels) {
             System.out.println(pattern + "=" + subscribedChannels);
        }
    
        // 取得按表达式的方式订阅的消息后的处理
        public void onPMessage(String pattern, String channel, String message) {
            System.out.println(pattern + "=" + channel + "=" + message);
        }
    
    }
    

    Jedis有两种订阅模式:subsribe(一般模式设置频道)和psubsribe(使用模式匹配来设置频道)。不管是那种模式都可以设置个数不定的频道。订阅得到信息在将会lister的onMessage(…)方法或者onPMessage(…)中进行进行处理,这里我们只是做了简单的输出。 具体代码见GitHup:https://github.com/granett/Redis/tree/master/src/main/java/com/redis/pubsub

    public class Subscribe {
    
        private Jedis jedis = new Jedis("192.168.1.207",6379);
    
        /**
         * SUBSCRIBE channel [channel ...]
         * 订阅给定的一个或多个频道的信息
         */
        @Test
        public void subscribe(){
            final PubSubListener listener = new PubSubListener();
            jedis.subscribe(listener, "channel");
        }
    
        /**
         * UNSUBSCRIBE [channel [channel ...]]
         * 指示客户端退订给定的频道
         * 如果没有频道被指定,即一个无参数的 UNSUBSCRIBE 调用被执行,
         * 那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。
         * 在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
         */
        @Test
        public void unsubscribe(){
            final PubSubListener listener = new PubSubListener();
            listener.unsubscribe("channel");
        }
    
        /**
         * PSUBSCRIBE pattern [pattern ...]
         * 订阅一个或多个符合给定模式的频道
         * 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等),
         * news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
         */
        @Test
        public void psubscribe(){
            final PubSubListener listener = new PubSubListener();
            jedis.psubscribe(listener, "ch*");
        }
    
        /**
         * PUNSUBSCRIBE [pattern [pattern ...]]
         * 指示客户端退订所有给定模式
         * 如果没有模式被指定,即一个无参数的 PUNSUBSCRIBE 调用被执行,
         * 那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。
         * 在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
         */
        @Test
        public void punsubscribe(){
            final PubSubListener listener = new PubSubListener();
            listener.punsubscribe("ch*");
        }
    
        /**
         * PUBLISH channel message
         * 将信息 message 发送到指定的频道 channel
         * 返回值:接收到信息 message 的订阅者数量
         */
        @Test
        public void publish(){
            jedis.publish("channel", "bar123");
            System.out.println("发布消息");
        }
    
        /**
         * PUBSUB CHANNELS [pattern]
         * 列出当前的活跃频道。 活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。
         * pattern 参数是可选的:
         *    如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
         *    如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。
         */
        @Test
        public void PUBSUB(){
            List<String> list = jedis.pubsubChannels("*");
            System.out.println(list);
        }
    
        /**
         * PUBSUB NUMSUB [channel-1 ... channel-N]
         * 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。
         */
        @Test
        public void pubsubNumSub(){
            Map<String,String> map = jedis.pubsubNumSub();
            System.out.println(map);
        }
    
        /**
         * PUBSUB NUMPAT
         * 返回订阅模式的数量。
         * 注意, 这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。
         */
        @Test
        public void pubsubNumPat(){
            Long count = jedis.pubsubNumPat();
            System.out.println(count);
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Redis的Pub/Sub(发布/订阅)

        本文链接:https://www.haomeiwen.com/subject/dlxxxftx.html