发布订阅
Redis 发布订阅 (pub/sub) 是一种消息通信模式:订阅者 (sub) 订阅频道,发送者 (pub) 向频道发送消息,订阅者通过该频道接收消息。

不过发布订阅模式有2点需要注意:
- 新开启的订阅客户端,无法接收到该频道之前的消息,因为redis不会对发布的消息进行持久化
- 客户端在执行订阅命令之后进入了订阅状态,只能接收订阅和退订的命令。
- 订阅频道
subscribe {channel} {channel} ...
- 发布消息
publish {channel} {message}
- 取消订阅
unsubscribe {channel} {channel} ...
- 按照模式订阅
psubscribe {pattern}
- 按照模式取消订阅
punsubscribe {pattern}
- 查询订阅
- 查看活跃频道
- 查看频道订阅数
- 查看模式订阅数
实例测试:
客户端1输入 SUBSCRIBE runoobChat 订阅频道 runoobChat

客户端2 在 频道 runoobChat 发布消息,订阅者就能接收到消息。

客户端1将会接收:

频道的订阅与退订
Redis 将所有频道的订阅关系保存在服务器状态的字典里面,这个字典的健是某个被订阅的频道,而健的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。
struct redisServer
{
dict *pubsub_channels; // 保存所有频道的订阅关系
...
};
订阅频道
每当客户端执行``命令订阅某些频道的时候,服务器斗湖将客户端与被订阅频道在 字典中进行关联。
按照频道是否有订阅者,关联操作可以分为2种情况执行:
- 如果频道已经有其他订阅者,那么它在
pubsub_channels
字典中必然有相应的订阅者列表, - 如果频道还未有任何订阅者,那么它必然不存在于
pubsub_channels
字典,程序首先在pubsub_channels
字典中,为频道创建一个健,并将这个健的值设置为空链表,然后再将客户端添加到链表,成为一个链表的第一个元素。
调用栈
(gdb) bt
#0 pubsubSubscribeChannel (c=0x5555558dd358, channel=0x5555558dc2c8) at pubsub.c:65
#1 0x00005555555b3670 in subscribeCommand (c=0x5555558dd358) at pubsub.c:480
#2 0x0000555555577528 in call (c=0x5555558dd358, flags=7) at redis.c:2441
#3 0x0000555555578044 in processCommand (c=0x5555558dd358) at redis.c:2766
#4 0x00005555555865c4 in processInputBuffer (c=0x5555558dd358) at networking.c:1539
#5 0x00005555555868b2 in readQueryFromClient (el=0x5555558666f8, fd=7, privdata=0x5555558dd358, mask=1)
at networking.c:1631
#6 0x00005555555701bf in aeProcessEvents (eventLoop=0x5555558666f8, flags=3) at ae.c:576
#7 0x0000555555570380 in aeMain (eventLoop=0x5555558666f8) at ae.c:635
#8 0x000055555557b30a in main (argc=2, argv=0x7fffffffe4c8) at redis.c:4079
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
// 关联示意图
// {
// 频道名 订阅频道的客户端
// 'channel-a' : [c1, c2, c3],
// }
/* Add the client to the channel -> list of clients hash table */
// 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
// 如果 channel 不存在于字典,那么添加进去
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// before:
// 'channel' : [c1, c2]
// after:
// 'channel' : [c1, c2, c3]
// 将客户端添加到链表的末尾
listAddNodeTail(clients,c);
}
/* Notify the client */
// 回复客户端。
// 示例:
// redis 127.0.0.1:6379> SUBSCRIBE xxx
// Reading messages... (press Ctrl-C to quit)
// 1) "subscribe"
// 2) "xxx"
// 3) (integer) 1
addReply(c,shared.mbulkhdr[3]);
// "subscribe\n" 字符串
addReply(c,shared.subscribebulk);
// 被订阅的客户端
addReplyBulk(c,channel);
// 客户端订阅的频道和模式总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
频道退订
/*
* 客户端 c 退订频道 channel ,如果取消成功返回 1 ,如果因为客户端未订阅频道,而造成取消失败,返回 0 。
*/
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
// 将频道 channel 从 client->channels 字典中移除
incrRefCount(channel);
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
// channel 移除成功,表示客户端订阅了这个频道,执行以下代码
retval = 1;
// 从 channel->clients 的 clients 链表中,移除 client
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
// 如果移除 client 之后链表为空,那么删除这个 channel 键
if (listLength(clients) == 0) {
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
// 回复客户端
if (notify) {
addReply(c,shared.mbulkhdr[3]);
// "ubsubscribe" 字符串
addReply(c,shared.unsubscribebulk);
// 被退订的频道
addReplyBulk(c,channel);
// 退订频道之后客户端仍在订阅的频道和模式的总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
模式的订阅
服务器将所有频道的订阅关系保存再服务器状态的,与此类似,服务器也将所有模式的订阅关系都保存在服务器状态的属性里面
struct redisServer
{
list *pubsub_patterns;
...
};
pubsub_patterns
属性是一个链表,链表中的每一个节点都包含着一个pubsubPattern
结构,这个结构的属性记录了被订阅的模式,而client
性则记录了订阅模式的客户端。
struct pubsubPattern {
redisClient* client;
robj *pattern; // 被订阅的模式
}

客户端执行命令订阅某些模式的时候,服务器会对每个被订阅的模式执行下列2个操作:
- 新建一个``结构,将结构的属性设置为被订阅的模式,client属性设置为订阅模式的客户端
- 将
pubsubPattern
结构添加到pubsub_patterns
链表的表尾
模式的退订
发送消息
- 将消息发给频道订阅者
- 将消息发给模型订阅者
网友评论