本文主要说明Redis的RDB持久化方式的实现及运作机制。
推荐阅读:
1、Redis 的RDB持久化方式的理论说明见: Redis之RDB持久化小探
I、核心数据结构rio
持久化的IO操作在rio.h
与rio.c
中实现,核心数据结构是struct rio
。RDB中的几乎每个函数都带有rio
参数,其抽象了文件和内存的操作:
/*RIO API 接口和状态*/
/*src/rio.h/_rio*/
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
// API
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
// 校验和计算函数,每次有写入/读取新数据时都要计算一次
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum */
// 当前校验和
uint64_t cksum;
/* number of bytes read or written */
size_t processed_bytes;
/* maximum single read or write chunk size */
size_t max_processing_chunk;
/* Backend-specific vars. */
/* 这个union可以是缓存,也可以是一个文件IO
* 因此在RDB持久化的时候可以将RDB保存到磁盘文件,也可以保存在内存中,但保存在内存中其实就不是持久化了。
*/
union {
struct {
// 缓存指针
sds ptr;
// 偏移量
off_t pos;
} buffer;
struct {
// 被打开文件的指针
FILE *fp;
// 最近一次 fsync() 以来,写入的字节量
off_t buffered; /* Bytes written since last fsync. */
// 写入多少字节之后,才会自动执行一次 fsync()
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
} io;
};
typedef struct _rio rio;
下面两个数据结构分别表示流为内存与流为文件:
// 适用于内存缓存
/*src/rio.c/rioBufferIO*/
static const rio rioBufferIO = {
rioBufferRead, //读函数
rioBufferWrite, //写函数
rioBufferTell, //偏移量函数
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
// 适用于文件IO
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
II、RDB持久化机制
RDB有两种持久化机制,一个是save
,另一个是bgsave
这两个的区别在于是否阻塞客户端服务,因为bgsave
是fork
子进程的方式完成的。这里重点说明以子进程的方式完成持久化:
通过调用rdbSaveBackground()
函数完成子进程的fork
:
/*bgsave 主函数*/
/*src/rdb.c/rdbSaveBackground*/
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
// 如果 BGSAVE 已经在执行,那么出错
if (server.rdb_child_pid != -1) return REDIS_ERR;
// 记录 BGSAVE 执行前的数据库被修改次数
server.dirty_before_bgsave = server.dirty;
// 最近一次尝试执行 BGSAVE 的时间
server.lastbgsave_try = time(NULL);
// fork() 开始前的时间,记录 fork() 返回耗时用
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
// 关闭网络连接 fd
closeListeningSockets(0);
// 设置进程的标题,方便识别
redisSetProcTitle("redis-rdb-bgsave");
// 调用rdbSave函数,真正执行保存操作
retval = rdbSave(filename);
// 打印 copy-on-write 时使用的内存数
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
// 向父进程发送信号
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
/* Parent */
// 计算 fork() 执行的时间
server.stat_fork_time = ustime()-start;
// 如果 fork() 出错,那么报告错误
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
// 打印 BGSAVE 开始的日志
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
// 记录数据库开始 BGSAVE 的时间
server.rdb_save_time_start = time(NULL);
// 记录负责执行 BGSAVE 的子进程 ID
server.rdb_child_pid = childpid;
// 关闭自动 rehash
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
可以看出,其中的主要调用为rdbSave
函数,这也是阻塞式save
的底层函数:
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success
* 将数据库保存到磁盘上。
*/
/*src/rdb.c/rdbSave*/
int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;
// 创建临时文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
// 初始化 I/O,这里是创建了rdb的结构体。
rioInitWithFile(&rdb,fp);
// 设置校验和函数
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
// 写入 RDB 版本号
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {
// 指向数据库
redisDb *db = server.db+j;
// 指向数据库键空间
dict *d = db->dict;
// 跳过空数据库
if (dictSize(d) == 0) continue;
// 创建键空间迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/* Write the SELECT DB opcode
*
* 写入 DB 选择器
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(&rdb,j) == -1) goto werr;
/* Iterate this DB writing every entry
*
* 遍历数据库,并写入每个键值对的数据
*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
// 根据 keystr ,在栈中创建一个 key 对象
initStaticStringObject(key,keystr);
// 获取键的过期时间
expire = getExpire(db,&key);
// 保存键值对数据
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/* EOF opcode
*
* 写入 EOF 代码
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case.
*
* CRC64 校验和。
*
* 如果校验和功能已关闭,那么 rdb.cksum 将为 0 ,
* 在这种情况下, RDB 载入时会跳过校验和检查。
*/
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb,&cksum,8);
/* Make sure data will not remain on the OS's output buffers */
// 冲洗缓存,确保数据已写入磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok.
*
* 使用 RENAME ,原子性地对临时文件进行改名,覆盖原来的 RDB 文件。
*/
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
// 写入完成,打印日志
redisLog(REDIS_NOTICE,"DB saved on disk");
// 清零数据库脏状态
server.dirty = 0;
// 记录最后一次完成 SAVE 的时间
server.lastsave = time(NULL);
// 记录最后一次执行 SAVE 的状态
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
// 关闭文件
fclose(fp);
// 删除临时文件
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
这里需要说明几点:
1、采用BGSAVE
策略,如果内存中的数据集很大,fork
会因为要为子进程产生一份虚拟空间(读时共享,写时拷贝)而花费的时间很长;可能会造成阻塞。
2、在RDB持久化过程中,每一个数据都需要调用一个write
的系统调用,CPU资源可能会紧张,因此需要避免在一台物理机上部署多个Redis,避免同时持久化。
3、在子进程完成之前,读取了自身的私有脏数据private_dirty
的大小,这可以近似看成是bgsave
进行过程中占用了多少内存。
III、RDB文件
在rdbSave
函数中,对于每一个键值对,都会调用函数rdbSaveKeyValuePair
函数进行存储,我们可以了解RDB文件对于每个键值对的组织形式,在看是如何存储的:
再来理解rdbSaveKeyValuePair
函数的实现:
/* Save a key-value pair, with expire time, type, key, value.
*
* 将键值对的键、值、过期时间和类型写入到 RDB 中。
*
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired).
*
* 成功保存返回 1 ,当键已经过期时,返回 0 。
*/
/*src/rdb.c/rdbSaveKeyValuePair*/
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
long long expiretime, long long now)
{
/* Save the expire time
*
* 保存键的过期时间
*/
if (expiretime != -1) {
/* If this key is already expired skip it
*
* 不写入已经过期的键
*/
if (expiretime < now) return 0;
//保存过期时间
if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
/* Save type, key, value
*
* 保存类型,键,值
*/
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1;
return 1;
}
【参考】
[1] 《Redis设计与实现》
[2] 《Redis源码日志》
网友评论