简介
A generic doubly linked quicklist implementation
A doubly linked list of ziplists
根据源代码里面给我出的解释就是这个是一个ziplist组成的双向链表。
每个节点使用ziplist来保存数据。
根据咱们的编程经验来说。一个链表使用另外一种链表来作为节点,那么本质上来说,quicklist里面保存着一个一个小的ziplist。
但是为啥不直接用ziplist喃。本身ziplist就是一个双向链表。并且额外的信息还特别的少。
其实原因很简单。根据我们上一节讲的ziplist来看,ziplist在我们程序里面来看将会是一块连续的内存块。它使用内存偏移来保存next从而节约了next指针。这样造成了我们每一次的删除插入操作都会进行remalloc,从而分配一块新的内存块。当我们的ziplist特别大的时候。没有这么大空闲的内存块给我们的时候。操作系统其实会抽象出一块连续的内存块给我。在底层来说他其实是一个链表链接成为的内存。不过在我们程序使用来说。他还是一块连续的内存。这样的话会造成内存碎片,并且在操作的时候因为内存不连续等原因造成效率问题。或者因为转移到大内存块等进行数据迁移。从而损失性能。
所以quicklist是对ziplist进行一次封装,使用小块的ziplist来既保证了少使用内存,也保证了性能。
数据结构
/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
* We use bit fields keep the quicklistNode at 32 bytes.
* count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
* encoding: 2 bits, RAW=1, LZF=2.
* container: 2 bits, NONE=1, ZIPLIST=2.
* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 12 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
struct quicklistNode *prev; //上一个node节点
struct quicklistNode *next; //下一个node
unsigned char *zl; //保存的数据 压缩前ziplist 压缩后压缩的数据
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
/* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
* 'sz' is byte length of 'compressed' field.
* 'compressed' is LZF data with total (compressed) length 'sz'
* NOTE: uncompressed length is stored in quicklistNode->sz.
* When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
/* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
typedef struct quicklist {
quicklistNode *head; //头结点
quicklistNode *tail; //尾节点
unsigned long count; /* total count of all entries in all ziplists */
unsigned int len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes *///负数代表级别,正数代表个数
unsigned int compress : 16; /* depth of end nodes not to compress;0=off *///压缩级别
} quicklist;
说起来这quickList就是一个标准的双向链表的配置,有head 有tail node里面有prev 有next 其他的参数都长度或者一些特别标识。总体来说,我们的quicklist会很简单。
看到这里大家基本都可以猜到插入查找删除等操作了
插入
由于咱们的quicklist是一个标准的双向链表,所以他的插入操作的原理会和普通的双向链表插入很像,不过喃由于他使用了其他链表来作为节点。所以喃,我们操作就是找到我们插入的节点。然后进行ziplist的插入就ok了。
/* Optimization levels for size-based filling */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
/* Maximum size in bytes of any multi-element ziplist.
* Larger values will live in their own isolated ziplists. */
#define SIZE_SAFETY_LIMIT 8192
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
int ziplist_overhead;
/* size of previous offset */
if (sz < 254) //根据我们ziplist的经验 对prvlen进行编码 小于254一位 大于等于254 五位
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
if (sz < 64) //编码字节的size 根据我们对len的编码
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;
//不过这个大小只是一个大概的猜想。因为我们知道我们ziplist里面还有很多特殊的操作。这个算出来的是一个最大的大小。总体来说只会比这个小,不会比这个大的
/* new_sz overestimates if 'sz' encodes to an integer type */
unsigned int new_sz = node->sz + sz + ziplist_overhead; //计算出一个大概新插入后的大小
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
else if (!sizeMeetsSafetyLimit(new_sz))//判断是不是个安全的大小
return 0;
else if ((int)node->count < fill)//这才来判断个数
return 1;
else
return 0;
}
REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
const int fill) {
if (fill >= 0)//大于等于0 是个数
return 0;
size_t offset = (-fill) - 1;//小于等于0 级别
if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
if (sz <= optimization_level[offset]) {
return 1;
} else {
return 0;
}
} else {
return 0;
}
}
REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
quicklistNode *old_node,
quicklistNode *new_node, int after) {
if (after) { //插入方式
new_node->prev = old_node;//设置节点关系
if (old_node) { //如果原来的节点有咯
new_node->next = old_node->next;//设置next
if (old_node->next)
old_node->next->prev = new_node;
old_node->next = new_node;
}
if (quicklist->tail == old_node)
quicklist->tail = new_node;
} else {
new_node->next = old_node;
if (old_node) {
new_node->prev = old_node->prev;
if (old_node->prev)
old_node->prev->next = new_node;
old_node->prev = new_node;
}
if (quicklist->head == old_node)
quicklist->head = new_node;
}
/* If this insert creates the only element so far, initialize head/tail. */
if (quicklist->len == 0) {//第一次加入 设置成头和尾
quicklist->head = quicklist->tail = new_node;
}
if (old_node)//加满了之后进行压缩操作
quicklistCompress(quicklist, old_node);
quicklist->len++;
}
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;//当前的head
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {//看下head让不让你插入
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);//让你插入就插
quicklistNodeUpdateSz(quicklist->head);//更新长度
} else {
quicklistNode *node = quicklistCreateNode();//不让插新建一个node
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);//在新的ziplist插入
quicklistNodeUpdateSz(node);//更新sz
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);//插入了新节点,维护quicklist的node关系,如果有压缩啥的进行压缩
}
quicklist->count++;//cout++
quicklist->head->count++;//cout++
return (orig_head != quicklist->head);
}
相对于这种标准的双向链表来说,插入就变得非常简单了。首先是确定插入头或者尾,插入的时候先判断该节点能否插入。能否插入喃是有几个标准的。第一个是不是设置了fill fill为负数的时候代表级别,每个级别ziplist能容纳的大小喃在上面有。如果没有设置的话。就看有没大于安全的标准大小。没有的话再看fill是不是正数,正数代表ziplist能够容纳的个数。这样判断完成能不能够进行该节点的插入。不能的话就创建一个新的节点,把数据插入进去。然后维护一下quicklist的双向链表关系。对老节点看下是否进行压缩操作等。
查找操作
list的查找操作主要是对index的
我们的quicklist的节点是由一个一个的ziplist构成的每个ziplist都有大小。所以我们就只需要先根据我们每个node的个数,从而找到对应的ziplist,调用ziplist的index就能成功找到。
typedef struct quicklistEntry {
const quicklist *quicklist;//那个quicklist
quicklistNode *node; //哪一个node
unsigned char *zi; //找到在ziplist的位置
unsigned char *value;//str value
long long longval; //int value
unsigned int sz; //svalue len
int offset;//对于ziplist的index
} quicklistEntry;
/* Populate 'entry' with the element at the specified zero-based index
* where 0 is the head, 1 is the element next to head
* and so on. Negative integers are used in order to count
* from the tail, -1 is the last element, -2 the penultimate
* and so on. If the index is out of range 0 is returned.
*
* Returns 1 if element found
* Returns 0 if element not found */
int quicklistIndex(const quicklist *quicklist, const long long idx,
quicklistEntry *entry) {
quicklistNode *n;
unsigned long long accum = 0;
unsigned long long index;
int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */ //向前还是像后
initEntry(entry);
entry->quicklist = quicklist;//设置list
if (!forward) {//方向
index = (-idx) - 1;
n = quicklist->tail;
} else {
index = idx;
n = quicklist->head;
}
if (index >= quicklist->count)//有米有这么多个
return 0;
while (likely(n)) { //找到对应的位置
if ((accum + n->count) > index) {
break;
} else {
D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
accum);
accum += n->count;
n = forward ? n->next : n->prev;//下一个node
}
}
if (!n)//没找到
return 0;
D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
accum, index, index - accum, (-index) - 1 + accum);
entry->node = n;//设置节点
if (forward) {//
/* forward = normal head-to-tail offset. */
entry->offset = index - accum;//位置 即ziplist的index
} else {
/* reverse = need negative offset for tail-to-head, so undo
* the result of the original if (index < 0) above. */
entry->offset = (-index) - 1 + accum;
}
quicklistDecompressNodeForUse(entry->node);//需要解压就解压
entry->zi = ziplistIndex(entry->node->zl, entry->offset);//找到节点块
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);//获取节点的值 反编码
/* The caller will use our result, so we don't re-compress here.
* The caller can recompress or delete the node as needed. */
return 1;
}
删除
/* Delete a range of elements from the quicklist.
*
* elements may span across multiple quicklistNodes, so we
* have to be careful about tracking where we start and end.
*
* Returns 1 if entries were deleted, 0 if nothing was deleted. */
int quicklistDelRange(quicklist *quicklist, const long start,
const long count) {
if (count <= 0)
return 0;
unsigned long extent = count; /* range is inclusive of start position *///默认你删的个数是对的
if (start >= 0 && extent > (quicklist->count - start)) {//但是喃没有这么多给你删
/* if requesting delete more elements than exist, limit to list size. */
extent = quicklist->count - start;
} else if (start < 0 && extent > (unsigned long)(-start)) {
/* else, if at negative offset, limit max size to rest of list. */
extent = -start; /* c.f. LREM -29 29; just delete until end. */
}
quicklistEntry entry;
if (!quicklistIndex(quicklist, start, &entry))//先找一下位置
return 0;
D("Quicklist delete request for start %ld, count %ld, extent: %ld", start,
count, extent);
quicklistNode *node = entry.node;//拿到要删的第一个node
/* iterate over next nodes until everything is deleted. */
while (extent) {
quicklistNode *next = node->next;//下一个节点喃 都是往一个方向删除哈
unsigned long del;
int delete_entire_node = 0;
if (entry.offset == 0 && extent >= node->count) {//他是第一个节点 并且要删的节点数大于这个node保存的节点数 就把这个node一起干掉
/* If we are deleting more than the count of this node, we
* can just delete the entire node without ziplist math. */
delete_entire_node = 1;
del = node->count;
} else if (entry.offset >= 0 && extent >= node->count) {//要删的个数大于 并且是正向来的
/* If deleting more nodes after this one, calculate delete based
* on size of current node. */
del = node->count - entry.offset;
} else if (entry.offset < 0) {//如果是反向的话
/* If offset is negative, we are in the first run of this loop
* and we are deleting the entire range
* from this start offset to end of list. Since the Negative
* offset is the number of elements until the tail of the list,
* just use it directly as the deletion count. */
del = -entry.offset;
/* If the positive offset is greater than the remaining extent,
* we only delete the remaining extent, not the entire offset.
*/
if (del > extent)//看下能删多少个
del = extent;
} else {
/* else, we are deleting less than the extent of this node, so
* use extent directly. */
del = extent;
}
D("[%ld]: asking to del: %ld because offset: %d; (ENTIRE NODE: %d), "
"node count: %u",
extent, del, entry.offset, delete_entire_node, node->count);
if (delete_entire_node) {//删节点
__quicklistDelNode(quicklist, node);
} else {
quicklistDecompressNodeForUse(node);//解压
node->zl = ziplistDeleteRange(node->zl, entry.offset, del);//ziplist删range
quicklistNodeUpdateSz(node);//更新大小
node->count -= del;//更新个数
quicklist->count -= del;//更新个数
quicklistDeleteIfEmpty(quicklist, node);//看下节点是不是空了
if (node)
quicklistRecompressOnly(quicklist, node);//压缩回去
}
extent -= del;//减除已经删除了的
node = next;//继续进行删除
entry.offset = 0;
}
return 1;
}
这里给出的是删除一个range
我们的操作也很简单。
1.先把要删除的个数确定下来。可能会存在删除的个数过多,其实没有那么多的情况。
2.找到要开始删除的节点。删除的情况其实特别的就是获取当前的offset为0,要删除的个数大于等于当前节点的数据个数。这样就把它全部删除。否则的话就判断当前节点需要删除几个
总结
quciklist把ziplist进行了封装,保证了ziplist的最大大小。从而压缩了内存。进一步的提高了效率。
magicdeMacBook-Pro:~ magic$ redis-cli
127.0.0.1:6379> rpush xislist 'xistest'
(integer) 2
127.0.0.1:6379> object encoding xislist
"quicklist"
127.0.0.1:6379>
其实我们可以发现。我们的list就是使用的quicklist.
参照方法。我们就可以知道我们的,命令的效率。
网友评论