Redis的t_list.c
是对list
数据结构的实现。
在Redis3.2之前,list
数据结构基于ziplist
或linkedlist
实现。
在Redis3.2之后,list
数据结构基于quicklist
实现。
而 quicklist 又基于 ziplist 实现。
在server.h
中定义了如下:
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
在redis.conf
中存在如下配置:
# Lists are also encoded in a special way to save a lot of space.
# The number of entries allowed per internal list node can be specified
# as a fixed maximum size or a maximum number of elements.
# For a fixed maximum size, use -5 through -1, meaning:
# -5: max size: 64 Kb <-- not recommended for normal workloads
# -4: max size: 32 Kb <-- not recommended
# -3: max size: 16 Kb <-- probably not recommended
# -2: max size: 8 Kb <-- good
# -1: max size: 4 Kb <-- good
# Positive numbers mean store up to _exactly_ that number of elements
# per list node.
# The highest performing option is usually -2 (8 Kb size) or -1 (4 Kb size),
# but if your use case is unique, adjust the settings as necessary.
list-max-ziplist-size -2
# Lists may also be compressed.
# Compress depth is the number of quicklist ziplist nodes from *each* side of
# the list to *exclude* from compression. The head and tail of the list
# are always uncompressed for fast push/pop operations. Settings are:
# 0: disable all list compression
# 1: depth 1 means "don't start compressing until after 1 node into the list,
# going from either the head or tail"
# So: [head]->node->node->...->node->[tail]
# [head], [tail] will always be uncompressed; inner nodes will compress.
# 2: [head]->[next]->node->node->...->node->[prev]->[tail]
# 2 here means: don't compress head or head->next or tail->prev or tail,
# but compress all nodes between them.
# 3: [head]->[next]->[next]->node->node->...->node->[prev]->[prev]->[tail]
# etc.
list-compress-depth 0
配置着quicklist
上的每个ziplist
上允许的entry
的个数及压缩深度。
1. 新增元素
//push的通用实现,where表示方向头尾,lpush、rpush
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
//根据key查找
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
//类型校验
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
for (j = 2; j < c->argc; j++) {
//不存在,则创建
if (!lobj) {
lobj = createQuicklistObject();
//quicklist基于ziplist实现,此处设置的每个ziplist上允许的最多的entry个数
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
//保存到dict中
dbAdd(c->db,c->argv[1],lobj);
}
//插入元素
listTypePush(lobj,c->argv[j],where);
pushed++;
}
//响应client
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
//lpush命令
void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}
//rpush命令
void rpushCommand(client *c) {
pushGenericCommand(c,LIST_TAIL);
}
void pushxGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *subject;
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;
for (j = 2; j < c->argc; j++) {
listTypePush(subject,c->argv[j],where);
pushed++;
}
addReplyLongLong(c,listTypeLength(subject));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
//lpushx命令 Prepend a value to a list, only if the list exists
void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}
//rpushx命令
void rpushxCommand(client *c) {
pushxGenericCommand(c,LIST_TAIL);
}
//linsert命令 LINSERT key BEFORE|AFTER pivot value
//Insert an element before or after another element in a list
//在指定元素之前或者之后插入元素
void linsertCommand(client *c) {
int where;
robj *subject;
listTypeIterator *iter;
listTypeEntry entry;
int inserted = 0;
//where
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
where = LIST_TAIL;
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
where = LIST_HEAD;
} else {
addReply(c,shared.syntaxerr);
return;
}
//查找list
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;
//查找目标元素
iter = listTypeInitIterator(subject,0,LIST_TAIL);
while (listTypeNext(iter,&entry)) {
//查到了就insert
if (listTypeEqual(&entry,c->argv[3])) {
listTypeInsert(&entry,c->argv[4],where);
inserted = 1;
break;
}
}
listTypeReleaseIterator(iter);
if (inserted) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
} else {
/* Notify client of a failed insert */
addReply(c,shared.cnegone);
return;
}
addReplyLongLong(c,listTypeLength(subject));
}
2. 删除元素
//pop命令的通用实现 where表示方向,头或者尾 lpop、rpop
void popGenericCommand(client *c, int where) {
//查找key
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
//删除元素
robj *value = listTypePop(o,where);
if (value == NULL) {
addReply(c,shared.nullbulk);
} else {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
//如果删完之后,元素个数为0,则将key删除
if (listTypeLength(o) == 0) {
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
//lpop命令
void lpopCommand(client *c) {
popGenericCommand(c,LIST_HEAD);
}
//rpop命令
void rpopCommand(client *c) {
popGenericCommand(c,LIST_TAIL);
}
//ltrim命令 ltrim key start stop
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;
//start和end
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
//查找key及类型校验
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
/* convert negative indexes */
//索引转换
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
/* Out of range start or start > end result in empty list */
ltrim = llen;
rtrim = 0;
} else {
if (end >= llen) end = llen-1;
ltrim = start;
rtrim = llen-end-1;
}
/* Remove list elements to perform the trim */
//移除非指定范围元素
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistDelRange(o->ptr,0,ltrim);
quicklistDelRange(o->ptr,-rtrim,rtrim);
} else {
serverPanic("Unknown list encoding");
}
notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
//lrem命令 lrem key count value
void lremCommand(client *c) {
robj *subject, *obj;
//要移除的值
obj = c->argv[3];
long toremove;
long removed = 0;
//移除数
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
return;
//查找目标list
subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
listTypeIterator *li;
//移除个数处理
if (toremove < 0) {
toremove = -toremove;
li = listTypeInitIterator(subject,-1,LIST_HEAD);
} else {
li = listTypeInitIterator(subject,0,LIST_TAIL);
}
//迭代
listTypeEntry entry;
while (listTypeNext(li,&entry)) {
//找到目标值
if (listTypeEqual(&entry,obj)) {
//删除
listTypeDelete(li, &entry);
server.dirty++;
removed++;
if (toremove && removed == toremove) break;
}
}
listTypeReleaseIterator(li);
if (removed) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
}
//列表为空则清除key
if (listTypeLength(subject) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
addReplyLongLong(c,removed);
}
3. 查找元素
//lindex命令
void lindexCommand(client *c) {
//查找key
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = NULL;
//index处理
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;
//编码校验
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistEntry entry;
//查找
if (quicklistIndex(o->ptr, index, &entry)) {
if (entry.value) {
value = createStringObject((char*)entry.value,entry.sz);
} else {
value = createStringObjectFromLongLong(entry.longval);
}
addReplyBulk(c,value);
decrRefCount(value);
} else {
addReply(c,shared.nullbulk);
}
} else {
serverPanic("Unknown list encoding");
}
}
//lrange命令
void lrangeCommand(client *c) {
robj *o;
long start, end, llen, rangelen;
//start和end
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
//查找key及类型校验
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
/* convert negative indexes */
//索引转换
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c,rangelen);
//查找
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
serverPanic("List encoding is not QUICKLIST!");
}
}
4. 修改元素
//lset命令 lset key index value
void lsetCommand(client *c) {
//查找key
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = c->argv[3];
//index处理
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;
//编码校验
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
//尝试查找并替换
int replaced = quicklistReplaceAtIndex(ql, index,
value->ptr, sdslen(value->ptr));
if (!replaced) {
addReply(c,shared.outofrangeerr);
} else {
addReply(c,shared.ok);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++;
}
} else {
serverPanic("Unknown list encoding");
}
}
5. 移动元素及阻塞队列
void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createQuicklistObject();
quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,dstkey,dstobj);
}
signalModifiedKey(c->db,dstkey);
listTypePush(dstobj,value,LIST_HEAD);
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
//rpoplpush命令 rpoplpush source destination
void rpoplpushCommand(client *c) {
robj *sobj, *value;
//查找源list
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,sobj,OBJ_LIST)) return;
//如果为空,直接返回
if (listTypeLength(sobj) == 0) {
/* This may only happen after loading very old RDB files. Recent
* versions of Redis delete keys of empty lists. */
addReply(c,shared.nullbulk);
} else {
//查找目标list
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];
if (dobj && checkType(c,dobj,OBJ_LIST)) return;
//源list pop一个元素
value = listTypePop(sobj,LIST_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
* may change the client command argument vector (it does not
* currently). */
incrRefCount(touchedkey);
//目标list push一个元素
rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
/* Delete the source list when it is empty */
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
if (listTypeLength(sobj) == 0) {
dbDelete(c->db,touchedkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
touchedkey,c->db->id);
}
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
if (c->cmd->proc == brpoplpushCommand) {
rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
}
}
}
/* Blocking RPOP/LPOP */
//blpop和brpop命令的通用实现
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
//超时时间timeout
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;
//多个key
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
if (listTypeLength(o) != 0) {
/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}
//阻塞类型的key存在dict *blocking_keys 中
/* If the list is empty or the key does not exists we must block */
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}
//blpop命令
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}
//brpop命令
void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}
//brpoplpush命令
void brpoplpushCommand(client *c) {
mstime_t timeout;
//timeout
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
!= C_OK) return;
//key
robj *key = lookupKeyWrite(c->db, c->argv[1]);
if (key == NULL) {
if (c->flags & CLIENT_MULTI) {
/* Blocking against an empty list in a multi state
* returns immediately. */
addReply(c, shared.nullbulk);
} else {
/* The list is empty and the client blocks. */
//存在此key,就block
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
}
} else {
if (key->type != OBJ_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
serverAssertWithInfo(c,key,listTypeLength(key) > 0);
rpoplpushCommand(c);
}
}
}
网友评论