美文网首页
Redis源码研究之订阅与发布

Redis源码研究之订阅与发布

作者: wenmingxing | 来源:发表于2018-04-30 18:53 被阅读34次

本文主要说明Redis的两种订阅模式的实现。

建议阅读:
1、Redis订阅与发布理论说明见:Redis之发布与订阅

I、上帝视角

1.1 两种模式

Redis具有两种订阅模式,分别为频道(channel)订阅,与模式(pattern)订阅,这部分内容可以参见建议阅读部分。

1.2 数据结构

1、struct redisServer中维护了所有频道和订阅频道的客户端:

/*src/redis.h/redisServer*/  
struct redisServer {
    ......
    /* Pubsub */
    /*用字典维护频道,key为channel,value为订阅这个频道的客户端链表*/
    dict *pubsub_channels; /* Map channels to list of subscribed clients */

    /*用一个链表结构维护模式订阅,每个链表节点(pubsubPattern)包含两部分内容(客户端+模式)*/
    list *pubsub_patterns; /* A list of pubsub_patterns */
    ......
};
/*订阅模式的数据结构*/  
/*src/redis.h/pubsubPattern*/  
typedef struct pubsubPattern {
    redisClient *client;  //订阅模式客户端
    robj *pattern;    //被订阅的模式
} pubsubPattern;

如下图所示:

2、struct redisClient维护了客户端自己所订阅的频道

/*src/redis.h/redisClient*/  
typedef struct redisClient {
    ......
    // 维护客户端订阅的频道,key为channel,value为null,以dict维护能在O(1)时间内确定是否订阅了某个频道
    dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */

    // 记录所有订阅模式的客户端信息
    list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
    ......
} redisClient;

3、订阅频道为一个dict,每个channel发布信息是,直接遍历channel对应的client list,即可将所有信息发布到对应的clients;

订阅模式为一个list,当有消息发布时,查找list,发布到对应的clients;

II、订阅

1、很容易想到,订阅过程即是对上述两个数据结构的维护,首先来看订阅频道的做法:

/*设置客户端c订阅频道channel*/
/* src/pubsub.c/pubsubSubscribeChannel*/  
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);    //增加引用计数

        // 关联示意图
        // {
        //  频道名        订阅频道的客户端
        //  'channel-a' : [c1, c2, c3],
        //  'channel-b' : [c5, c2, c1],
        //  'channel-c' : [c10, c2, c1]
        // }
        /* Add the client to the channel -> list of clients hash table */
        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
        // 如果 channel 不存在于字典,那么添加进去
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // before:
        // 'channel' : [c1, c2]
        // after:
        // 'channel' : [c1, c2, c3]
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> SUBSCRIBE xxx
    // Reading messages... (press Ctrl-C to quit)
    // 1) "subscribe"
    // 2) "xxx"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // "subscribe\n" 字符串
    addReply(c,shared.subscribebulk);
    // 被订阅的客户端
    addReplyBulk(c,channel);
    // 客户端订阅的频道和模式总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
}

2、订阅模式实现:

/*设置客户端c订阅模式pattern*/  
/*src/pubsub.c/pubsubSubscribeChannel*/
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;

    // 在链表中查找模式,看客户端是否已经订阅了这个模式
    // 这里为什么不像 channel 那样,用字典来进行检测呢?
    // 虽然 pattern 的数量一般来说并不多
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        
        // 如果没有的话,执行以下代码

        retval = 1;

        pubsubPattern *pat;

        // 将 pattern 添加到 c->pubsub_patterns 链表中
        listAddNodeTail(c->pubsub_patterns,pattern);

        incrRefCount(pattern);

        // 创建并设置新的 pubsubPattern 结构
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;

        // 添加到末尾
        listAddNodeTail(server.pubsub_patterns,pat);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> PSUBSCRIBE xxx*
    // Reading messages... (press Ctrl-C to quit)
    // 1) "psubscribe"
    // 2) "xxx*"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // 回复 "psubscribe" 字符串
    addReply(c,shared.psubscribebulk);
    // 回复被订阅的模式
    addReplyBulk(c,pattern);
    // 回复客户端订阅的频道和模式的总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
}

3、频道与模式的退订与订阅相反。

III、消息发布

1、发布消息的过程即为遍历两个数据结构,然后将消息发布到匹配的客户端:

/*发布消息,包括发布到订阅channel,与发布到订阅pattern*/  
/*src/pubsub/pubsubPublishMessage*/  

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    // 取出包含所有订阅频道 channel 的客户端的链表
    // 并将消息发送给它们
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        // 遍历客户端链表,将 message 发送给它们
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

            // 回复客户端。
            // 示例:
            // 1) "message"
            // 2) "xxx"
            // 3) "hello"
            addReply(c,shared.mbulkhdr[3]);
            // "message" 字符串
            addReply(c,shared.messagebulk);
            // 消息的来源频道
            addReplyBulk(c,channel);
            // 消息内容
            addReplyBulk(c,message);

            // 接收客户端计数
            receivers++;
        }
    }

    /* Send to clients listening to matching channels */
    // 将消息也发送给那些和频道匹配的模式
    if (listLength(server.pubsub_patterns)) {

        // 遍历模式链表
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {

            // 取出 pubsubPattern
            pubsubPattern *pat = ln->value;

            // 如果 channel 和 pattern 匹配
            // 就给所有订阅该 pattern 的客户端发送消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {

                // 回复客户端
                // 示例:
                // 1) "pmessage"
                // 2) "*"
                // 3) "xxx"
                // 4) "hello"
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);

                // 对接收消息的客户端进行计数
                receivers++;
            }
        }

        decrRefCount(channel);
    }

    // 返回计数
    return receivers;
}

IV、processCommand

在之前讲过的processCommand函数中,也有对订阅的处理,即**如果客户端处于订阅与发布中,则,只能执行订阅与发布的相关命令:

/*src/redis.c/processCommand*/
int processCommand(redisClient *c) {
   ......
   // 在订阅发布模式下,只允许处理SUBSCRIBE 或者UNSUBSCRIBE 命令
   // 从下面的检测条件可以看出:只要存在redisClient.pubsub_channels 或者
   // redisClient.pubsub_patterns,就代表处于订阅发布模式下
   /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
   if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
       &&
       c->cmd->proc != subscribeCommand &&
       c->cmd->proc != unsubscribeCommand &&
       c->cmd->proc != psubscribeCommand &&
       c->cmd->proc != punsubscribeCommand) {
       addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed
               "in this context");
       return REDIS_OK;
       }
   ......
}  

【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》

相关文章

  • Redis源码研究之订阅与发布

    本文主要说明Redis的两种订阅模式的实现。 建议阅读: 1、Redis订阅与发布理论说明见:Redis之发布与...

  • redis-订阅与发布

    redis-订阅与发布 Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这...

  • 101-Redis 订阅与发布

    订阅与发布 ¶Redis 发布订阅 http://www.redis.net.cn/order/3633.html...

  • Redis实现不可靠发布/订阅功能

    Redis的发布/订阅模型 Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式,...

  • Redis发布与订阅

    一、发布与订阅 实际中,redis很少使用发布与订阅来代替MQ角色。 二、使用redis客户端实现 Redis 发...

  • Redis 实战 —— 05. Redis 其他命令简介

    发布与订阅 P52 Redis 实现了发布与订阅(publish/subscribe)模式,又称 pub/sub ...

  • Redis发布订阅模式

    Redis支持发布订阅模式,先了解一下与发布订阅相关的命令。 发布订阅模式命令 SUBSCRIBE命令用于订阅ch...

  • Redis高级

    Redis高级 发布订阅 Redis提供了发布订阅功能,可以用于消息的传输 Redis的发布订阅机制包括三个部分,...

  • redis 事务

    redis的发布与订阅订阅一个频道:subscribe channel(频道名称) ...发布消息:publish...

  • redis学习之发布与订阅

    发布与订阅(又称pub/sub),是一种消息通信模式,特点是订阅者(listener)负责订阅频道(channel...

网友评论

      本文标题:Redis源码研究之订阅与发布

      本文链接:https://www.haomeiwen.com/subject/pfftrftx.html