美文网首页
Redis的Pub/Sub为何不建议进行消息订阅

Redis的Pub/Sub为何不建议进行消息订阅

作者: 盼旺 | 来源:发表于2023-06-15 17:43 被阅读0次

    应用场景

    pubsub可以应用于高性能高吞吐的发布订阅场景,订阅者通过sub订阅某一个channel,发布者往channel发布消息,当channel收到消息的时候,会将消息推送给channel下的所有订阅者。由于是纯内存操作,因此可以达到极高的吞吐和极短的延时。可以用于构建实时系统,任务通知等。纯内存操作带来了高性能低延时的收益,但是同时也带来了消息丢失的风险,此外还有其他的不足。

    实现原理

    订阅者新建一个连接并订阅一个channel,当该channel写入消息时,会向该连接推送消息。一个channel可以有多个订阅者,当channel写入消息时,所有的订阅者都会收到消息推送。

    同一个channel也可以有多个发布者写入消息,消息写入以后并不会进行保存。消息写入后,会将消息发送给所有的订阅者,一旦订阅者断开连接,那么将不会收到该消息。如果出现网络异常导致订阅者连接重连,那么从连接断开到重连的这段期间,该channel的消息订阅者都不会收到。

    为什么不推荐

    1.消息丢失:Redis的Pub/Sub模式不会对消息进行持久化,如果订阅者在消息发布之前未连接到Redis服务器,它们将无法接收到之前发布的消息。这意味着如果订阅者在消息发布之前断开连接或重新启动,它们将错过这些消息。

    2.内存占用:由于Redis将所有订阅者的订阅信息存储在内存中,当订阅者数量非常大时,可能会导致Redis服务器的内存占用过高。这会对Redis的性能和可伸缩性产生负面影响。

    3.阻塞问题:当订阅者在执行阻塞操作(例如阻塞式读取)时,它们将无法处理其他的Redis命令。这可能会导致性能问题,特别是在高并发环境中。

    4.无法保证消息传递顺序:在Pub/Sub模式中,消息的传递是异步的,并且无法保证消息的传递顺序。如果应用程序需要处理有序的消息,Pub/Sub模式可能不适合。

    源码subscribeCommand

    // 用法: SUBSCRIBE channel [channel ...]
    // pubsub.c
    void subscribeCommand(client *c) {
        int j;
        // n 个channel 的订阅,循环调用即可
        for (j = 1; j < c->argc; j++)
            pubsubSubscribeChannel(c,c->argv[j]);
        // 添加pubsub订阅标识,方便其他地方判断
        c->flags |= CLIENT_PUBSUB;
    }
    // 具体的单个 channel 订阅实现
    /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
     * 0 if the client was already subscribed to that channel. */
    int pubsubSubscribeChannel(client *c, robj *channel) {
        dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        /* Add the channel to the client -> channels hash table */
        // step1. 将要订阅的 channel 添加到各自客户端的 pubsub_channels 容器中
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
            retval = 1;
            incrRefCount(channel);
            /* Add the client to the channel -> list of clients hash table */
            // step2. 将要订阅的channel 添加到 server.pubsub_channels 中, 方便在publish时判定是否触发通知
            de = dictFind(server.pubsub_channels,channel);
            if (de == NULL) {
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {
                clients = dictGetVal(de);
            }
            // step3. 将客户端自身添加到相应的 server.pubsub_channels 对应的队列中去, 在通知时只需遍历该队列即可
            listAddNodeTail(clients,c);
        }
        /* Notify the client */
        // 响应客户端: 
        // *3 \r\n
        // $9\r\nsubscribe\r\n
        // channel
        // 111(该客户端总共订阅的channel数)
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.subscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,clientSubscriptionsCount(c));
        return retval;
    }
    // 客户端订阅的总channel数, 两种订阅方式相加
    /* Return the number of channels + patterns a client is subscribed to. */
    int clientSubscriptionsCount(client *c) {
        return dictSize(c->pubsub_channels)+
               listLength(c->pubsub_patterns);
    }
    

    源码psubscribeCommand

    // 用法: PSUBSCRIBE pattern [pattern ...]
    // pubsub.c
    void psubscribeCommand(client *c) {
        int j;
        // 同样是n个channel依次注册
        for (j = 1; j < c->argc; j++)
            pubsubSubscribePattern(c,c->argv[j]);
        c->flags |= CLIENT_PUBSUB;
    }
    // 注册单个模式匹配的 channel 订阅
    /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
    int pubsubSubscribePattern(client *c, robj *pattern) {
        int retval = 0;
        // 直接查找对应的 pattern, 没有则添加
        if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
            retval = 1;
            pubsubPattern *pat;
            listAddNodeTail(c->pubsub_patterns,pattern);
            incrRefCount(pattern);
            pat = zmalloc(sizeof(*pat));
            pat->pattern = getDecodedObject(pattern);
            pat->client = c;
            listAddNodeTail(server.pubsub_patterns,pat);
        }
        /* Notify the client */
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.psubscribebulk);
        addReplyBulk(c,pattern);
        addReplyLongLong(c,clientSubscriptionsCount(c));
        return retval;
    }
    

    相关文章

      网友评论

          本文标题:Redis的Pub/Sub为何不建议进行消息订阅

          本文链接:https://www.haomeiwen.com/subject/zwylydtx.html