Redis源码学习之RDB

作者: lixin_karl | 来源:发表于2019-05-15 15:10 被阅读0次

    RDB持久化

    一、RDB文件的创建与载入

       Redis是一个内存服务器,如果关闭Redis进程,那么数据库的数据即消失不见,为了解决这个问题,Redis提供了RDB持久化功能,我们可以把数据库内存中的数据保存到RDB文件中。

       保存到RDB文件可以手动输入BGSAVE或者SAVE命令执行,还可以根据数据库的配置定期执行。

       其中,SAVE命令是阻塞SAVE,即执行此命令期间,数据库不处理任何其他客户端请求直到命令执行结束。而BGSAVE,系统会开启一个子进程专门用来生成RDB文件,父进程继续处理客户端命令。

       而对于数据库配置来说,再redis.conf文件中配置如下的话,900s内数据库至少被修改了

    save 900 20
    save 200 10
    

    20次,或者200s内数据库至少被修改了10次,就自动持久化到RDB文件。

       由于AOF文件的更新频率是大于RDB的,如果开启的AOF功能,那么会优先使用AOF加载数据库。

    二、根据源代码看RDB文件存储结构

       RDB文件结构如下

    REDIS db_version databases checksum
    REDIS字符串 四字节 实际数据 结束标志 校验和

       databases的结构

    DB标志 DBindex db键个数标志 db键个数 db过期键个数 键值对
    SELECTDB index RESIZEDB keynums expirekeynums k-v pairs

       k-v pairs的结构

    过期时间标志 过期时间 值类型
    EXPIRETIME_MS expire_time 类型 字符串

    下面根据源代码看databases存储格式

    1. bgsaveCommand
    void bgsaveCommand(client *c) {
        int schedule = 0;
        //如果有schedule选项,当AOF正在运行时,BGSAVE会等AOF完成
        if (c->argc > 1) {
            if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
                schedule = 1;
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);//就是更新选择的数据库id
    
        if (server.rdb_child_pid != -1) {
            addReplyError(c,"Background save already in progress");
        } else if (server.aof_child_pid != -1) {//aof正在运行
            if (schedule) {
                server.rdb_bgsave_scheduled = 1;//
                addReplyStatus(c,"Background saving scheduled");
            } else {
                addReplyError(c,
                    "An AOF log rewriting in progress: can't BGSAVE right now. "
                    "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
                    "possible.");
            }
        } else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {//实际处理BGSAVE函数
            addReplyStatus(c,"Background saving started");
        } else {
            addReply(c,shared.err);
        }
    } 
    
    2. rdbSaveBackground
    int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
        pid_t childpid;//子进程id
        long long start;//开始时间
    
        if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;//aof保存正在运行或者rdb正在运行,不能再rdb了
    
        server.dirty_before_bgsave = server.dirty;//bgsave之前的脏数据个数
        server.lastbgsave_try = time(NULL);//最近一次bgsave开始
        openChildInfoPipe();//打开父子进程通信的无名管道
        start = ustime();//开始时间
        if ((childpid = fork()) == 0) {//子进程中childid还是等于0
            int retval;
            closeListeningSockets(0);//子进程关闭复制过来的监听端口
            redisSetProcTitle("redis-rdb-bgsave");
            retval = rdbSave(filename,rsi);//save操作实际调用函数
            if (retval == C_OK) {
                size_t private_dirty = zmalloc_get_private_dirty(-1);
                if (private_dirty) {
                    serverLog(LL_NOTICE,
                        "RDB: %zu MB of memory used by copy-on-write",
                        private_dirty/(1024*1024));
                }
                server.child_info_data.cow_size = private_dirty;
                sendChildInfo(CHILD_INFO_TYPE_RDB);
            }
            exitFromChild((retval == C_OK) ? 0 : 1);//成功退出
        } else {//父进程
            server.stat_fork_time = ustime()-start;//fork需要的时间
            server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
            latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
            if (childpid == -1) {//如果fork失败了
                closeChildInfoPipe();//关闭匿名通道
                server.lastbgsave_status = C_ERR;
                serverLog(LL_WARNING,"Can't save in background: fork: %s",
                    strerror(errno));
                return C_ERR;
            }
            serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
            server.rdb_save_time_start = time(NULL);
            server.rdb_child_pid = childpid;//设置child的pid
            server.rdb_child_type = RDB_CHILD_TYPE_DISK;//写到rdb文件标志
            updateDictResizePolicy();//如果没有rdb aof操作才可以resize
            return C_OK;
        }
        return C_OK; /* unreached */
    }
    
    3. rdbSave
    int rdbSave(char *filename, rdbSaveInfo *rsi) {
        char tmpfile[256];
        char cwd[MAXPATHLEN]; /*当前存储错误信息的工作路径*/
        FILE *fp;
        rio rdb;
        int error = 0;
        snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());//先存储为tmp rdb文件
        fp = fopen(tmpfile,"w");
        if (!fp) {
            char *cwdp = getcwd(cwd,MAXPATHLEN);
            serverLog(LL_WARNING,
                "Failed opening the RDB file %s (in server root dir %s) "
                "for saving: %s",
                filename,
                cwdp ? cwdp : "unknown",
                strerror(errno));
            return C_ERR;
        }
        rioInitWithFile(&rdb,fp);//初始化rio(Redis IO)
        if (server.rdb_save_incremental_fsync)
            rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);//设置每次读取的字节数
    
        if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {//数据写入rdb文件
            errno = error;
            goto werr;
        }
        //保证缓存里面的数据全部写到fp中去了
        if (fflush(fp) == EOF) goto werr;
        if (fsync(fileno(fp)) == -1) goto werr;
        if (fclose(fp) == EOF) goto werr;
        //重命名rdb文件
        if (rename(tmpfile,filename) == -1) {
            char *cwdp = getcwd(cwd,MAXPATHLEN);
            serverLog(LL_WARNING,
                "Error moving temp DB file %s on the final "
                "destination %s (in server root dir %s): %s",
                tmpfile,
                filename,
                cwdp ? cwdp : "unknown",
                strerror(errno));
            unlink(tmpfile);
            return C_ERR;
        }
        serverLog(LL_NOTICE,"DB saved on disk");
        server.dirty = 0;
        server.lastsave = time(NULL);
        server.lastbgsave_status = C_OK;
        return C_OK;
    werr:
        serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
        fclose(fp);
        unlink(tmpfile);
        return C_ERR;
    }
    
    4. rdbSaveRio

       以下代码就是真正的写入RDB的函数了,从以下代码中可以清晰的看出RDB文件开头为

    REDIS db_version databases(DB标志 DBindex db键个数标志 db键个数 db过期键个数 保存键值对) 结束标志(EOF) 校验和。其中键值对保存方式见下一个函数。

    //将数据库中的数据写入rdb文件的函数
    int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
        dictIterator *di = NULL;//字典迭代器
        dictEntry *de;//字典的一个实体信息
        char magic[10];
        int j;
        uint64_t cksum;//校验和
        size_t processed = 0;
        if (server.rdb_checksum)
            rdb->update_cksum = rioGenericUpdateChecksum;//更新生成校验和的函数
        //REDIS0009 (9个字节) 即REDIS db_version
        snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
        if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;//原始数据写到rdb文件中
        if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;//写入一些其他信息
        for (j = 0; j < server.dbnum; j++) {//遍历每个数据库 即databases的开始
            redisDb *db = server.db+j;
            dict *d = db->dict;
            if (dictSize(d) == 0) continue;
            di = dictGetSafeIterator(d);//更新迭代器
            if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;//选择的db标志
            if (rdbSaveLen(rdb,j) == -1) goto werr;//第几个db
            uint64_t db_size, expires_size;
            db_size = dictSize(db->dict);
            expires_size = dictSize(db->expires);
            if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;//db键个数标志
            if (rdbSaveLen(rdb,db_size) == -1) goto werr;//db键个数
            if (rdbSaveLen(rdb,expires_size) == -1) goto werr;//过期键的键个数
            /* 遍历 DB的key-val*/
            while((de = dictNext(di)) != NULL) {
                sds keystr = dictGetKey(de);
                robj key, *o = dictGetVal(de);
                long long expire;
                initStaticStringObject(key,keystr);
                expire = getExpire(db,&key);
                if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;//保存键值对和过期时间
    
                /* When this RDB is produced as part of an AOF rewrite, move
                 * accumulated diff from parent to child while rewriting in
                 * order to have a smaller final write. */
                if (flags & RDB_SAVE_AOF_PREAMBLE &&
                    rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
                {
                    processed = rdb->processed_bytes;
                    aofReadDiffFromParent();
                }
            }
            dictReleaseIterator(di);
            di = NULL;
        }
        //保存lua脚本的信息
        if (rsi && dictSize(server.lua_scripts)) {
            di = dictGetIterator(server.lua_scripts);
            while((de = dictNext(di)) != NULL) {
                robj *body = dictGetVal(de);
                if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                    goto werr;
            }
            dictReleaseIterator(di);
            di = NULL; /* So that we don't release it again on error. */
        }
        if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;//写入结束标志
        
        cksum = rdb->cksum;
        memrev64ifbe(&cksum);
        if (rioWrite(rdb,&cksum,8) == 0) goto werr;//checksum 写入校验和
        return C_OK;
    
    werr:
        if (error) *error = errno;
        if (di) dictReleaseIterator(di);
        return C_ERR;
    }
    
    5. rdbSaveKeyValuePair 保存键值对

       上一小节中的保存键值对结构为:(过期时间标志 过期时间 值类型信息 写入键 写入值 )。

    int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
        int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;//最近常用
        int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;//最不常用
        /* Save the expire time */
        if (expiretime != -1) {
            if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;//设置过期时间标志
            if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;//写入过期时间
        }
    
        /* 保存LRU信息. */
        if (savelru) {
            uint64_t idletime = estimateObjectIdleTime(val);
            idletime /= 1000; /* Using seconds is enough and requires less space.*/
            if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
            if (rdbSaveLen(rdb,idletime) == -1) return -1;
        }
    
        /*保存LFU信息*/
        if (savelfu) {
            uint8_t buf[1];
            buf[0] = LFUDecrAndReturn(val);
            if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
            if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        }
        
        if (rdbSaveObjectType(rdb,val) == -1) return -1;//先保存值得类型
        if (rdbSaveStringObject(rdb,key) == -1) return -1;//再保存字符串key
        if (rdbSaveObject(rdb,val) == -1) return -1;//再保存值
        return 1;
    }
    
    三、od查看RDB文件

    ​ od -c dump.rdb
    里面只有一个 set karl hlx 然后执行save命令后的rdb文件。


    捕获.PNG
    结构 上图中对应部分
    REDIS REDIS
    db_version 0006
    376 \0 SELECT 0(选择0号数据库)
    \0 值类型RDB_TYPE_STRING
    004 键长
    karl
    003 值长
    377 EOF(结束标志)
    剩余的 校验和
    四、参考

    相关文章

      网友评论

        本文标题:Redis源码学习之RDB

        本文链接:https://www.haomeiwen.com/subject/xtipaqtx.html