应用场景
pubsub可以应用于高性能高吞吐的发布订阅场景,订阅者通过sub订阅某一个channel,发布者往channel发布消息,当channel收到消息的时候,会将消息推送给channel下的所有订阅者。由于是纯内存操作,因此可以达到极高的吞吐和极短的延时。可以用于构建实时系统,任务通知等。纯内存操作带来了高性能低延时的收益,但是同时也带来了消息丢失的风险,此外还有其他的不足。
实现原理
订阅者新建一个连接并订阅一个channel,当该channel写入消息时,会向该连接推送消息。一个channel可以有多个订阅者,当channel写入消息时,所有的订阅者都会收到消息推送。
同一个channel也可以有多个发布者写入消息,消息写入以后并不会进行保存。消息写入后,会将消息发送给所有的订阅者,一旦订阅者断开连接,那么将不会收到该消息。如果出现网络异常导致订阅者连接重连,那么从连接断开到重连的这段期间,该channel的消息订阅者都不会收到。
为什么不推荐
1.消息丢失:Redis的Pub/Sub模式不会对消息进行持久化,如果订阅者在消息发布之前未连接到Redis服务器,它们将无法接收到之前发布的消息。这意味着如果订阅者在消息发布之前断开连接或重新启动,它们将错过这些消息。
2.内存占用:由于Redis将所有订阅者的订阅信息存储在内存中,当订阅者数量非常大时,可能会导致Redis服务器的内存占用过高。这会对Redis的性能和可伸缩性产生负面影响。
3.阻塞问题:当订阅者在执行阻塞操作(例如阻塞式读取)时,它们将无法处理其他的Redis命令。这可能会导致性能问题,特别是在高并发环境中。
4.无法保证消息传递顺序:在Pub/Sub模式中,消息的传递是异步的,并且无法保证消息的传递顺序。如果应用程序需要处理有序的消息,Pub/Sub模式可能不适合。
源码subscribeCommand
// 用法: SUBSCRIBE channel [channel ...]
// pubsub.c
void subscribeCommand(client *c) {
int j;
// n 个channel 的订阅,循环调用即可
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
// 添加pubsub订阅标识,方便其他地方判断
c->flags |= CLIENT_PUBSUB;
}
// 具体的单个 channel 订阅实现
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// step1. 将要订阅的 channel 添加到各自客户端的 pubsub_channels 容器中
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
// step2. 将要订阅的channel 添加到 server.pubsub_channels 中, 方便在publish时判定是否触发通知
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// step3. 将客户端自身添加到相应的 server.pubsub_channels 对应的队列中去, 在通知时只需遍历该队列即可
listAddNodeTail(clients,c);
}
/* Notify the client */
// 响应客户端:
// *3 \r\n
// $9\r\nsubscribe\r\n
// channel
// 111(该客户端总共订阅的channel数)
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
// 客户端订阅的总channel数, 两种订阅方式相加
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns);
}
源码psubscribeCommand
// 用法: PSUBSCRIBE pattern [pattern ...]
// pubsub.c
void psubscribeCommand(client *c) {
int j;
// 同样是n个channel依次注册
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}
// 注册单个模式匹配的 channel 订阅
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;
// 直接查找对应的 pattern, 没有则添加
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
网友评论