redis源码阅读
1.redis集群
redis集群储存数据使用哈希槽,类似于hash环。将key通过crc16编码映射为0-16384其中的一个值。并将该key储存到相应的服务器上
集群储存示意图
redis 客户端命令这里不详细解释
客户端API特征
-
支持redis集群
:- 连接redis集群,并发送命令.
-
支持多key命令
:- 支持
MSET
,MGET
和DEL
命令.
- 支持
-
支持管道
:- 支持管道,并且支持多key命令.
-
支持异步
:- 用户可以使用异步模式.
2.客户端hiredis-vip
hiredis-vip代码建立在原有的hiredis之上,其实现了对redis集群的访问
hiredis-vip的源码比hiredis多了下面文件(只带.c):
* hircluster.c ----redis集群
* adlist.c ----C模仿list结构
* command.c ----传输命令
* crc16.c ----crc16编码
* hiarray.c ----C模仿array结构
* hiutil.c ----工具
只介绍hircluster.c和command.c,其他都是工具。
2.1.1结构体
结构体cluster_node:服务器节点信息
typedef struct cluster_node
{
sds name;
sds addr;
sds host;
int port;
uint8_t role; /* 主服务 OR 备用服务 */
uint8_t myself;
redisContext *con;
redisAsyncContext *acon;
struct hilist *slots;
struct hilist *slaves;
int failure_count;
void *data;
struct hiarray *migrating;
struct hiarray *importing;
}cluster_node;
结构体redisClusterContext:与redis集群连接的上下文
typedef struct redisClusterContext {
int err;
char errstr[128];
sds ip;
int port;
int flags;
enum redisConnectionType connection_type;
struct timeval *connect_timeout;
struct timeval *timeout;
struct hiarray *slots;
struct dict *nodes;
cluster_node *table[REDIS_CLUSTER_SLOTS];
uint64_t route_version;
int max_redirect_count;
int retry_count;
struct hilist *requests;
int need_update_route;
int64_t update_route_time;
} redisClusterContext;
结构体redisClusterAsyncContext:与redis集群异步连接
typedef struct redisClusterAsyncContext {
redisClusterContext *cc;
int err;
char errstr[128];
void *data;
void *adapter;
adapterAttachFn *attach_fn;
redisDisconnectCallback *onDisconnect;
redisConnectCallback *onConnect;
} redisClusterAsyncContext;
结构体之间的关系
结构体关系图
2.1.2 集群请求
实现hiredis-vip的redis集群请求分为三步
- 集群结构体初始化
/*初始化结构体*/
redisClusterContext *redisClusterContextInit(void);
/*初始化结构体(调用redisClusterContextInit),添加集群节点(调用redisClusterSetOptionAddNodes),更新集群节点hash槽信息(调用cluster_update_route)*/
redisClusterContext *redisClusterConnect(const char *addrs, int flags);
redisClusterContext *redisClusterConnectWithTimeout(const char *addrs,
const struct timeval tv, int flags);
redisClusterContext *redisClusterConnectNonBlock(const char *addrs, int flags);
- 集群请求的相关配置
int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr);//填充redisClusterContext的nodes
int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs);//填充redisClusterContext的nodes
int redisClusterSetOptionConnectBlock(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionParseSlaves(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc);//填充redisClusterContext的flag
int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv);//填充redisClusterContext的connect_timeout
int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv);//填充redisClusterContext的timeout
int redisClusterSetOptionMaxRedirect(redisClusterContext *cc, int max_redirect_count);//填充redisClusterContext的max_redirect_count
- 建立和发送请求
- 格式化命令,如:SET foo bar1 转为 *3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$4\r\nbar1\r\n。*3表示3个参数,$3表示该参数占3个字符,这也是最后客户端发送给服务端的消息
- 创建command结构体,并将格式化的命令填充该结构体。根据key使用crc16编码计算hash值。并通过hash值找到node。
- 根据node创建redisContext结构体,建立socket连接,并将连接放在redisContext结构体,将格式化的命令放入结构体的缓冲区。
- 发送并接收请求。
/*下面3个函数是格式化命令*/
void *redisClustervCommand(redisClusterContext *cc, const char *format, va_list ap);
void *redisClusterCommand(redisClusterContext *cc, const char *format, ...);
void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen);
/*创建command结构体,计算key的hash值,填充该结构体*/
void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len);
/*下面四个函数将命令放入该结构体的缓冲区。*/
int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd, int len);
int redisClustervAppendCommand(redisClusterContext *cc, const char *format, va_list ap);
int redisClusterAppendCommand(redisClusterContext *cc, const char *format, ...);
int redisClusterAppendCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen);
/*根据node创建上下文也就是redisContext结构体。再此函数中建立socket连接,并将连接放在redisContext结构体*/
redisContext *ctx_get_by_node(redisClusterContext *cc, struct cluster_node *node);
/*执行命令,调用上面的ctx_get_by_node函数*/
int redisGetReply(redisContext *c, void **reply);
/*
* 更新集群的node配置。
*/
int cluster_update_route(redisClusterContext *cc);
struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags);
struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags);
2.1.2 异步接口
异步接口结构体
typedef struct redisClusterAsyncContext {
redisClusterContext *cc;
int err;
char errstr[128];
void *data;
void *adapter;
adapterAttachFn *attach_fn;
redisDisconnectCallback *onDisconnect;
redisConnectCallback *onConnect;
} redisClusterAsyncContext;
异步方法
/*初始化redis集群结构体redisClusterContext(调用redisClusterConnectNonBlock),初始化异步结构体*/
redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs, int flags);
/*设置回掉连接和断开连接的函数*/
int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc, redisConnectCallback *fn);
int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc, redisDisconnectCallback *fn);
/*下面四个函数将命令放入该结构体的缓冲区。调用actx_get_by_node来获取redisContext结构体*/
int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, char *cmd, int len);
int redisClustervAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, va_list ap);
int redisClusterAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, ...);
int redisClusterAsyncCommandArgv(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
/*断开连接*/
void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc);
void redisClusterAsyncFree(redisClusterAsyncContext *acc);
/*异步根据node获取redisAsyncContext上下文*/
redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, cluster_node *node);
3.hiredis
4.redis
ae事件处理
redis实现了自己的事件库,这个事件库被实现在ae.c
理解redis事件库怎样工作的最好方式是明白redis如何使用它。
Event Loop 初始化
initServer
方法定义在 redis.c
,该方法用于初始化全局变量server
(redisServer
类型结构体)。server
变量中一个最重要的变量是el
(aeEventLoop 类型结构体)。<--!server.el
包含了ae事件!>
initServer
方法通过调用 aeCreateEventLoop
方法初始化server.el
,aeEventLoop
结构体定义如下:
typedef struct aeEventLoop
{
int maxfd;/*最大句柄*/
long long timeEventNextId;/*定时事件的个数,*/
aeFileEvent events[AE_SETSIZE]; /*已经注册的句柄事件*/
aeFiredEvent fired[AE_SETSIZE]; /*已经触发的句柄事件*/
aeTimeEvent *timeEventHead; /*定时事件链表*/
int stop;/*循环是否结束*/
void *apidata;
aeBeforeSleepProc *beforesleep;/*指针函数,每次sleep事件前调用的方法*/
aeBeforeSleepProc *aftersleep;/*指针函数,每次sleep事件后调用的方法*/
} aeEventLoop;
/*注:1.aftersleep在原文中没有,但是在redis5代码中出现。其中sleep指阻塞或者等待,每次循环时都会等待一段时间(该时间由触发最短时间事件的时间决定)。下文会介绍。
2.redis5支持的IO复用:epoll,select,evport,kqueue。本文中以epoll为准
3.apidata储存epoll句柄和epoll_event结构体数组,epoll_
*/
aeCreateEventLoop
aeCreateEventLoop
首先分配内存给 aeEventLoop
结构体,然后调用 ae_epoll.c:aeApiCreate
。ae_epoll.c:aeApiCreate
会使用合适的IO复用接口
aeApiCreate
分配内存给 aeApiState
,aeApiState
有两个变量:epfd
用于储存 epoll_create
返回的 epoll
句柄, events
数组变量(struct epoll_event
类型 )。event的索引为socket句柄
aeCreateTimeEvent
aeCreateTimeEvent
接受如下参数:
-
eventLoop
: 这是server.el
- milliseconds: 定时时间
-
proc
: 函数指针,定时事件的函数。 -
clientData
: 大部分为NULL
. -
finalizerProc
:指向从定时事件列表中删除定时事件之前必须调用的函数的指针。
initServer
调用aeCreateTimeEvent
添加定时事件到server.el
的timeEventHead
。timeEventHead
是双向链表。原文调用如下:
aeCreateTimeEvent(server.el /*eventLoop*/, 1 /*milliseconds*/, serverCron /*proc*/, NULL /*clientData*/, NULL /*finalizerProc*/);
redis.c:serverCron
里面包含了多个定时事件。
aeCreateFileEvent
The essence of aeCreateFileEvent
function is to execute epoll_ctl
system call which adds a watch for EPOLLIN
event on the listening descriptor create by anetTcpServer
and associate it with the epoll
descriptor created by a call to aeCreateEventLoop
aeCreateFileEvent
函数的本质是执行epoll_ctl
系统调用。aeCreateFileEvent
方法使用epoll监控socket句柄,并为其配置mask表示socket句柄的读写方式。
initServer
将以下参数传递给aeCreateFileEvent
:
-
server.el
:由aeCreateEventLoop
创建的ae事件句柄。 -
server.fd
:需要监听的句柄。同时也作为一个索引从eventLoop->events
获取write和read回掉函数。 -
AE_READABLE
:表示监听句柄的模式 -
acceptHandler
:句柄事件响应的回到函数。该函数指针存储在eventLoop->events[server.fd]->rfileProc
。
这样就完成了Redis事件循环的初始化。
Event Loop Processing
main
将会调用aeMain
进行事件的循环。aeMain
的循环在主进程,所以想要移植ae事件的同学注意aeMain
一定要在代码的最后。
aeMain
使用while
循环调用 ae.c:aeProcessEvents
。在while
会不停的判断是否有aftersleep
函数,如果有就执行。ae.c:aeProcessEvents
循环同时处理句柄事件和定时事件。
aeProcessEvents
-
ae.c:aeProcessEvents
通过aeSearchNearestTimer
找到最近执行的定时事件。并获取该时间间隔。 - 将最短时间间隔赋值给tvp。如果最短时间间隔为负,将tvp设置为0,如redis中第一次的定时时间为1毫秒。
- 调用
aeApiPoll
函数。aeApiPoll
函数实际调用epoll_wait
来sleep,sleep时间为tvp。之后aeApiPoll
函数将有响应的socket句柄放入eventLoop->fired
中 - 查看是否有aftersleep函数,有则执行。
- 查看
eventLoop->fired
中是否有需要处理的句柄,有则处理 - 执行
processTimeEvents
。processTimeEvents
用于处理定时事件 - 在
processTimeEvents
中遍历链表执行定时任务。如果定时任务被贴上删除标记则删除。
eventLoop->fired
数组元素内容:
-
fd
: 已经触发socket事件的句柄 -
mask
: 事件描述,可读或者可写
我们以客户端请求连接来说明那些函数被调用了。
- 客户端向服务器请求一个连接。
-
aeApiPoll
将会监测到服务器句柄触发可读。将服务器句柄放入eventLoop->fired
- ae事件检测到
eventLoop->fired
有触发,调用acceptHandler
-
acceptHandler
实际执行 accept ,获得客户端句柄。调用aeCreateFileEvent
将客户端句柄放入ae事件中。如下
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
freeClient(c);
return NULL;
}
processTimeEvents
ae.processTimeEvents
遍历链表eventLoop->timeEventHead
。如果达到时间就执行定时事件。每次执行一次完定时事件,都会用 ae.c:aeAddMilliSeconds
重新设置下一次执行时间。在redis中,所有的定时事件都在serverCron
中。通过配置文件hz 10
设置频率。默认1000/10秒执行一次。
持久化
rdb的优势
- RDB是Redis数据的非常紧凑的单文件时间点表示。RDB文件非常适合备份。例如每天备份一次。这使你可以在灾难情况下轻松还原数据集的不同版本。
- RDB非常适合灾难恢复,因为它可以将单个压缩文件传输到远程数据中心,也可以传输到Amazon S3上。
- RDB最大限度地提高了Redis的性能,因为Redis通过fork的方式生成一个子进程,子进程执行rdb储存工作。父实例将永远不会执行磁盘I/O等操作。
- 与AOF相比,RDB允许大型数据集更快地重启。
RDB的缺点 - 如果是着重强调数据完整性和安全性,rdb并不是一个好方案,因为他常常是分钟以上的时间进行备份。
- RDB需要经常使用fork()才能使用子进程将其持久化在磁盘上。如果数据集很大,Fork()可能很耗时,并且如果数据集很大且CPU性能不佳,则可能导致Redis停止为客户端服务几毫秒甚至一秒钟。
AOF的优势 - 使用AOF Redis更加持久:您可以使用不同的fsync策略:完全没有fsync,每秒fsync,每个查询都fsync。使用默认策略fsync时,每秒的写入性能仍然很好,fsync是使用后台线程执行的,并且在没有进行fsync的情况下,主线程将尽力执行写入操作。
- AOF日志是仅追加的日志。类似于linux的history功能。即使由于某种原因(磁盘已满或其他原因)以半写命令结束日志,redis-check-aof工具也可以轻松修复它。
- Redis的aof文件太大时,Redis可以在后台自动重写AOF。重写是完全安全的,因为Redis继续追加到旧文件时,会生成一个全新的文件,一旦准备好第二个文件,Redis就会切换两者并开始追加到新的那一个。
- AOF以易于理解和解析的格式包含所有操作的日志。您甚至可以轻松导出AOF文件。例如,即使您使用FLUSHALL命令刷新了所有错误文件,如果在此期间未执行任何日志重写操作,您仍然可以保存数据集,只是停止服务器,删除最新命令并重新启动Redis。
AOF的缺点 - 对于同一数据集,AOF文件通常大于等数据量的RDB文件。
根据确切的fsync策略,AOF可能比RDB慢。通常,在将fsync设置为每秒的情况下,性能仍然很高,并且在禁用fsync的情况下,即使在高负载下,它也应与RDB一样快。即使在巨大的写负载的情况下,RDB仍然能够提供有关最大延迟的更多保证。 - 过去,我们在特定命令中遇到过罕见的错误(例如,其中有一个涉及阻止命令,例如BRPOPLPUSH),导致生成的AOF在重新加载时无法完全重现相同的数据集。这种错误很少见,我们在测试套件中进行了测试,可以自动创建随机的复杂数据集,然后重新加载它们以检查一切正常,但是对于RDB持久性来说,这种错误几乎是不可能的。为了更清楚地说明这一点:Redis AOF像MySQL或MongoDB一样,以增量方式更新现有状态,而RDB快照一次又一次地创建所有内容,从概念上讲更健壮。但是-1)应该注意的是,每次Redis重写AOF时,都会从数据集中包含的实际数据开始重新创建AOF,与始终附加AOF文件(或重写为读取旧AOF而不是读取内存中的数据)相比,提高了对错误的抵抗力。2)我们从未收到用户关于在现实世界中检测到的AOF损坏的单个报告。
rdb源码解读
rdb在出现在两个地方。1.程序结束之前。2.serverCron定时器操作。这里我们主要看serverCron。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//省略...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren())
{
//省略...
} else {
for (j = 0; j < server.saveparamslen; j++) {
//轮流判断save规则,符合规则就执行持久化。规则见redis.conf
struct saveparam *sp = server.saveparams+j;
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
//省略...
}
//省略...
return 1000/server.hz;
}
//转到rdbSaveBackground函数
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
//省略...
openChildInfoPipe();
//fork子进程进行持久化
if ((childpid = fork()) == 0) {
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
//省略...
}
//省略...
return C_OK;
}
//转到rdbSave
int rdbSave(char *filename, rdbSaveInfo *rsi) {
//省略...
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
//省略...
}
//转到rdbSaveRio,这里执行内存持久化
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
//添加文件头部
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
//添加其他变量,创建时间,内存占用,bit值
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
//循环数据库
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
//储存TYPE为数据库编号,表示后面一个字节指数据库号
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
//储存数据库号
if (rdbSaveLen(rdb,j) == -1) goto werr;
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
//储存数据库的一些信息
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* 循环字典 */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
//储存key value值
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
//....
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
//脚本缓存
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
//结束字符
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
//校验位,crc16编码
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
//转到rdbSaveKeyValuePair
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
//储存过期时间
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
//储存lru信息,https://www.cnblogs.com/hapjin/archive/2019/06/07/10933405.html
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
//储存lfu信息
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
/* We can encode this in exactly two bytes: the opcode and an 8
* bit counter, since the frequency is logarithmic with a 0-255 range.
* Note that we do not store the halving time because to reset it
* a single time when loading does not affect the frequency much. */
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
//储存类型
if (rdbSaveObjectType(rdb,val) == -1) return -1;
//储存key,value。格式为[len][date]。当value是"100","-17"这中能被编码成整形时,就储存整形形式
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
return 1;
}
//savekey相当于save string。
//转到rdbSaveObject,储存内容。
//最后调用write持久化。
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
/* Save a string value */
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
nwritten += n;
} else if (o->type == OBJ_LIST) {
/* Save a list value */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head;
if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
nwritten += n;
while(node) {
if (quicklistNodeIsCompressed(node)) {
void *data;
size_t compress_len = quicklistGetLzf(node, &data);
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
nwritten += n;
} else {
if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
nwritten += n;
}
node = node->next;
}
} else {
serverPanic("Unknown list encoding");
}
} else if (o->type == OBJ_SET) {
/* Save a set value */
if (o->encoding == OBJ_ENCODING_HT) {
dict *set = o->ptr;
dictIterator *di = dictGetIterator(set);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
== -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
size_t l = intsetBlobLen((intset*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else {
serverPanic("Unknown set encoding");
}
} else if (o->type == OBJ_ZSET) {
/* Save a sorted set value */
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
zskiplist *zsl = zs->zsl;
if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
nwritten += n;
/* We save the skiplist elements from the greatest to the smallest
* (that's trivial since the elements are already ordered in the
* skiplist): this improves the load process, since the next loaded
* element will always be the smaller, so adding to the skiplist
* will always immediately stop at the head, making the insertion
* O(1) instead of O(log(N)). */
zskiplistNode *zn = zsl->tail;
while (zn != NULL) {
if ((n = rdbSaveRawString(rdb,
(unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
{
return -1;
}
nwritten += n;
if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
return -1;
nwritten += n;
zn = zn->backward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (o->type == OBJ_HASH) {
/* Save a hash value */
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds field = dictGetKey(de);
sds value = dictGetVal(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
sdslen(field))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
sdslen(value))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown hash encoding");
}
} else if (o->type == OBJ_STREAM) {
/* Store how many listpacks we have inside the radix tree. */
stream *s = o->ptr;
rax *rax = s->rax;
if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
nwritten += n;
/* Serialize all the listpacks inside the radix tree as they are,
* when loading back, we'll use the first entry of each listpack
* to insert it back into the radix tree. */
raxIterator ri;
raxStart(&ri,rax);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
unsigned char *lp = ri.data;
size_t lp_bytes = lpBytes(lp);
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
nwritten += n;
}
raxStop(&ri);
/* Save the number of elements inside the stream. We cannot obtain
* this easily later, since our macro nodes should be checked for
* number of items: not a great CPU / space tradeoff. */
if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
nwritten += n;
/* Save the last entry ID. */
if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */
/* Save the number of groups. */
size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
nwritten += n;
if (num_cgroups) {
/* Serialize each consumer group. */
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
/* Save the group name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
return -1;
nwritten += n;
/* Last ID. */
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
nwritten += n;
/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
nwritten += n;
/* Save the consumers of this group. */
if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
nwritten += n;
}
raxStop(&ri);
}
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,rdb,key);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
int retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1) return -1;
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
return io.error ? -1 : (ssize_t)io.bytes;
} else {
serverPanic("Unknown object type");
}
return nwritten;
}
//优化处理1.提前分配好rio内存空间
网友评论