美文网首页工作生活
「Redis源码解读」—持久化(一)RDB

「Redis源码解读」—持久化(一)RDB

作者: wh4763 | 来源:发表于2019-07-02 03:38 被阅读0次

    知识点

    • RDB文件用于保存和还原Redis服务器所有数据库中的所有键值对数据
    • SAVE命令由服务器服务器进程直接执行保存操作,所以该命令会阻塞服务器
    • BGSAVE命令由子进程执行保存操作,所以该命令不会阻塞服务器
    • 服务器状态中会保存所有用save选项设置的保存条件,当任意一个保存条件被满足时,服务器会自动执行BGSAVE命令
    • RDB文件是一个经过压缩的二进制文件,由多个部分组成
    • 对不同类型的键值对,RDB文件会使用不同的方式来保存
    • RDB持久化既可以手动执行,也可以根据服务器配置选项定期执行,该功能可以将某个时间点上的数据库状态保存到一个RDB文件中

    RDB文件的创建和载入

    有两个Redis命令可以用于生成RDB文件,一个是SAVE,另一个是BGSAVE

    SVAE

    • 当SAVE命令执行时,Redis服务器会被阻塞,所以当SAVE命令正在执行时,客户端发送的所有命令请求都会被阻塞
    • 只有在服务器执行完SAVE命令才会重新开始接收命令请求并处理

    SAVE命令由rdbSave函数实现,代码如下:

    /*
     * 将数据库保存到磁盘上。成功返回 REDIS_OK ,失败返回 REDIS_ERR 。
     */
    int rdbSave(char *filename) {
        dictIterator *di = NULL;
        dictEntry *de;
        char tmpfile[256];
        char magic[10];
        int j;
        long long now = mstime();
        FILE *fp;
        rio rdb;
        uint64_t cksum;
        // 以 "temp-<pid>.rdb" 格式创建临时文件名
        snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
        fp = fopen(tmpfile,"w");
        if (!fp) {
            redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        // 初始化 rio 文件
        rioInitWithFile(&rdb,fp);
        // 如果有需要的话,设置校验和计算函数
        if (server.rdb_checksum)
            rdb.update_cksum = rioGenericUpdateChecksum;
        // 以 "REDIS <VERSION>" 格式写入文件头,以及 RDB 的版本
        snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
        if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
        // 遍历所有数据库,保存它们的数据
        for (j = 0; j < server.dbnum; j++) {
            // 指向数据库
            redisDb *db = server.db+j;
            // 指向数据库 key space
            dict *d = db->dict;
            // 数据库为空, pass ,处理下个数据库
            if (dictSize(d) == 0) continue;
            // 创建迭代器
            di = dictGetSafeIterator(d);
            if (!di) {
                fclose(fp);
                return REDIS_ERR;
            }
            /* Write the SELECT DB opcode */
            // 记录正在使用的数据库的号码
            if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
            if (rdbSaveLen(&rdb,j) == -1) goto werr;
            /* Iterate this DB writing every entry */
            // 将数据库中的所有节点保存到 RDB 文件
            while((de = dictNext(di)) != NULL) {
                // 取出键
                sds keystr = dictGetKey(de);
                // 取出值
                robj key, 
                     *o = dictGetVal(de);
                long long expire;      
                initStaticStringObject(key,keystr);
                // 取出过期时间
                expire = getExpire(db,&key);
                if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
            }
            dictReleaseIterator(di);
        }
        di = NULL; /* So that we don't release it again on error. */
        /* EOF opcode */
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr
        /* CRC64 checksum. It will be zero if checksum computation is disabled, the
         * loading code skips the check in this case. */
        cksum = rdb.cksum;
        memrev64ifbe(&cksum);
        rioWrite(&rdb,&cksum,8);
        /* Make sure data will not remain on the OS's output buffers */
        fflush(fp);
        fsync(fileno(fp));
        fclose(fp);
        /* Use RENAME to make sure the DB file is changed atomically only
         * if the generate DB file is ok. */
        // 将临时文件 tmpfile 改名为 filename 
        if (rename(tmpfile,filename) == -1) {
            redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
            unlink(tmpfile);
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,"DB saved on disk");
        // 初始化数据库数据
        server.dirty = 0;
        server.lastsave = time(NULL);
        server.lastbgsave_status = REDIS_OK;
        return REDIS_OK;
    werr:
        fclose(fp);
        unlink(tmpfile);
        redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
        if (di) dictReleaseIterator(di);
        return REDIS_ERR;
    }
    
    

    BGSAVE

    • 因为BGSAVE命令的保存工作是由子进程执行的,所以在子进程创建RDB文件的过程中,Redis服务器仍然可以继续处理客户端请求,但是,在BGSAVE命令执行期间,服务器处理SAVE、BGSAVE、BGREWRITEAOF三个命令的方式会和平时有所不同。
      1.客户端发送的SAVE命令会被拒绝,为了避免产生竞争条件
      2.BGSAVE命令也会被拒绝,也是为了避免产生竞争条件
      3.BGREWRITEAOF命令会被延迟到BGSAVE命令执行完毕之后执行

    BGSAVE由rdbSaveBackground函数实现,代码如下:

    /*
     * 使用子进程保存数据库数据,不阻塞主进程
     */
    int rdbSaveBackground(char *filename) {
        pid_t childpid;
        long long start;
        if (server.rdb_child_pid != -1) return REDIS_ERR;
        // 修改服务器状态
        server.dirty_before_bgsave = server.dirty;
        // 开始时间
        start = ustime();
        // 创建子进程
        if ((childpid = fork()) == 0) {
            int retval;
            /* Child */
            // 子进程不接收网络数据
            if (server.ipfd > 0) close(server.ipfd);
            if (server.sofd > 0) close(server.sofd);
            // 保存数据
            retval = rdbSave(filename);
            if (retval == REDIS_OK) {
                size_t private_dirty = zmalloc_get_private_dirty();
                if (private_dirty) {
                    redisLog(REDIS_NOTICE,
                        "RDB: %lu MB of memory used by copy-on-write",
                        private_dirty/(1024*1024));
                }
            }
            // 退出子进程
            exitFromChild((retval == REDIS_OK) ? 0 : 1);
        } else {
            /* Parent */
            // 记录最后一次 fork 的时间
            server.stat_fork_time = ustime()-start;
            // 创建子进程失败时进行错误报告
            if (childpid == -1) {
                redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                    strerror(errno));
                return REDIS_ERR;
            }
            redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
            // 记录保存开始的时间
            server.rdb_save_time_start = time(NULL);
            // 记录子进程的 id
            server.rdb_child_pid = childpid;
            // 在执行时关闭对数据库的 rehash
            // 避免 copy-on-write
            updateDictResizePolicy();
            return REDIS_OK;
        }
        return REDIS_OK; /* unreached */
    }
    

    通过阅读rdbSaveBackground(char *filename)的源码可知,其最终的实现还是调用rdbSave(char *filename),只不过是通过fork()出的子进程来执行罢了,所以bgsave和save的实现是殊途同归

    rdb持久化的核心代码:

    /* Save a key-value pair, with expire time, type, key, value.
     * 保存键值对,值的类型,以及它的过期时间(如果有的话)。
     *
     * On error -1 is returned.
     * 出错返回 -1 。
     *
     * On success if the key was actaully saved 1 is returned, otherwise 0
     * is returned (the key was already expired). 
     *
     * 如果 key 已经过期,放弃保存,返回 0 。
     * 如果 key 保存成功,返回 1 。
     */
    int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                            long long expiretime, long long now)
    {
        /* Save the expire time */
        // 保存过期时间
        if (expiretime != -1) {
            /* If this key is already expired skip it */
            // key 已过期,直接跳过
            if (expiretime < now) return 0;
    
            if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
            if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
        }
    
        /* Save type, key, value */
        // 保存值类型
        if (rdbSaveObjectType(rdb,val) == -1) return -1;
        // 保存 key
        if (rdbSaveStringObject(rdb,key) == -1) return -1;
        // 保存 value
        if (rdbSaveObject(rdb,val) == -1) return -1;
    
        return 1;
    }
    

    RDB文件载入

    • 服务器在载入RDB文件期间,会一直处于阻塞状态,直到载入工作完成为止
      载入RDB由rdbLoad函数实现,代码如下:
    *
     * 读取 rdb 文件,并将其中的对象保存到内存中
     */
    int rdbLoad(char *filename) {
        uint32_t dbid;
        int type, rdbver;
        redisDb *db = server.db+0;
        char buf[1024];
        long long expiretime, now = mstime();
        long loops = 0;
        FILE *fp;
        rio rdb;
        // 打开文件
        fp = fopen(filename,"r");
        if (!fp) {
            errno = ENOENT;
            return REDIS_ERR;
        }
        // 初始化 rdb 文件
        rioInitWithFile(&rdb,fp);
        if (server.rdb_checksum)
            rdb.update_cksum = rioGenericUpdateChecksum;
        // 检查 rdb 文件头(“REDIS”字符串,以及版本号)
        if (rioRead(&rdb,buf,9) == 0) goto eoferr;
        buf[9] = '\0';
        if (memcmp(buf,"REDIS",5) != 0) {   // "REDIS"
            fclose(fp);
            redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
            errno = EINVAL;
            return REDIS_ERR;
        }
        rdbver = atoi(buf+5);   // 版本号
        if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
            fclose(fp);
            redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
            errno = EINVAL;
            return REDIS_ERR;
        }
        startLoading(fp);
        while(1) {
            robj *key, *val;
            expiretime = -1;
            /* Serve the clients from time to time */
            // 间隔性服务客户端
            if (!(loops++ % 1000)) {
                // 刷新载入进程信息
                loadingProgress(rioTell(&rdb));
                // 处理事件
                aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
            }
            /* Read type. */
            // 读入类型标识符
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            // 接下来的值是一个过期时间
            if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
                // 读取毫秒计数的过期时间
                if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
                /* We read the time so we need to read the object type again. */
                // 读取下一个值(一个字符串 key )的类型标识符
                if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
                /* the EXPIRETIME opcode specifies time in seconds, so convert
                 * into milliesconds. */
                 // 将毫秒转换为秒
                expiretime *= 1000;
            } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
                /* Milliseconds precision expire times introduced with RDB
                 * version 3. */
                // 读取毫秒计数的过期时间
                if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
                /* We read the time so we need to read the object type again. */
                // 读取下一个值(一个字符串 key )的类型标识符
                if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            }
            // 到达 EOF ,跳出
            if (type == REDIS_RDB_OPCODE_EOF)
                break;
            /* Handle SELECT DB opcode as a special case */
            // 数据库号码标识符
            if (type == REDIS_RDB_OPCODE_SELECTDB) {
                // 读取数据库号
                if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                    goto eoferr;
                // 检查数据库号是否合法
                if (dbid >= (unsigned)server.dbnum) {
                    redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                    exit(1);
                }
                db = server.db+dbid;
                continue;
            }
            /* Read key */
            // 读入 key
            if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
            /* Read value */
            // 读入 value
            if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
            /* Check if the key already expired. This function is used when loading
             * an RDB file from disk, either at startup, or when an RDB was
             * received from the master. In the latter case, the master is
             * responsible for key expiry. If we would expire keys here, the
             * snapshot taken by the master may not be reflected on the slave. */
            // 如果 key 已经过期,那么释放 key 和 value
            if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
                decrRefCount(key);
                decrRefCount(val);
                continue;
            }
            /* Add the new object in the hash table */
            // 将对象添加到数据库
            dbAdd(db,key,val);
            /* Set the expire time if needed */
            // 如果有过期时间,设置过期时间
            if (expiretime != -1) setExpire(db,key,expiretime);
            decrRefCount(key);
        }
    

    自动间隔性保存

    因为BGSAVE命令可以在不阻塞服务器进程的情况下执行,所以Redis允许用户通过设置服务器配置的save选项,让服务器每隔一段时间自动执行一次BGSAVE命令。

    设置保存条件

    当Redis启动时,用户可以通过指定配置文件或者传入启动参数的方式设置save选项,如果用户没有主动设置save选项,那么服务器会为save选项设置默认条件

    • save 900 1 (服务器在900秒内,对数据库进行了至少一次修改)
    • save 300 10
    • save 60 10000
      那么只要满足以上三个条件中的任意一个,BGSAVE命令就会被执行
      接着,Redis会根据save选项配置设置服务器状态redisServer的saveparams属性
    struct redisServer{
      //...
      //记录了保存条件的数组
      struct saveparam *saveparams
      
       //...
    }
    

    除了saveparams数组之外,redis还维持着一个dirty计数器以及一个lastsave属性

    • dirty记录距离上一次成功执行SAVE/BGSAVE之后对数据库进行了多少次修改(包括写入、删除、更新等操作)
    • lastsave属性是一个时间戳,记录上一次成功执行SAVE/BGSAVE命令等时间

    检查保存条件是否满足

    Redis 会周期性的执行serverCron默认每隔100毫秒就执行一次,它的其中一项工作就是检查是否满足save条件
    serverCron代码如下:

    /* This is our timer interrupt, called REDIS_HZ times per second.
     * 时间中断器,调用间隔为 REDIS_HZ 。
     *
     * Here is where we do a number of things that need to be done asynchronously.
     * For instance:
     *
     * 以下是需要异步地完成的工作:
     *
     * - Active expired keys collection (it is also performed in a lazy way on
     *   lookup).
     *   主动回收过期的键
     *
     * - Software watchdong.
     *   WATCHDOG
     *
     * - Update some statistic.
     *   更新统计信息
     *
     * - Incremental rehashing of the DBs hash tables.
     *   对数据库进行渐进式 REHASH
     *
     * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
     *   触发 BGSAVE 、 AOF 重写,并处理随之而来的子进程中介
     *
     * - Clients timeout of differnet kinds.
     *   各种类型的客户端超时
     *
     * - Replication reconnection.
     *   重连复制节点
     *
     * - Many more...
     *   等等
     *
     * Everything directly called here will be called REDIS_HZ times per second,
     * so in order to throttle execution of things we want to do less frequently
     * a macro is used: run_with_period(milliseconds) { .... }
     *
     * 因为在这个函数中直接调用的函数都会以 REDIS_HZ 频率调用,
     * 为了调整部分函数执行的频率,使用了 run_with_period(ms) { ... }
     * 来修改代码的执行频率
     */
    
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        int j;
        REDIS_NOTUSED(eventLoop);
        REDIS_NOTUSED(id);
        REDIS_NOTUSED(clientData);
    
        /* Software watchdog: deliver the SIGALRM that will reach the signal
         * handler if we don't return here fast enough. */
        if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    
        /* We take a cached value of the unix time in the global state because
         * with virtual memory and aging there is to store the current time
         * in objects at every object access, and accuracy is not needed.
         * To access a global var is faster than calling time(NULL) */
        // 将 UNIX 时间保存在服务器状态中,减少对 time(NULL) 的调用,加速。
        server.unixtime = time(NULL);
    
        // 对执行命令的时间进行采样分析
        run_with_period(100) trackOperationsPerSecond();
    
        /* We have just 22 bits per object for LRU information.
         * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
         * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
         *
         * Note that even if this will wrap after 1.5 years it's not a problem,
         * everything will still work but just some object will appear younger
         * to Redis. But for this to happen a given object should never be touched
         * for 1.5 years.
         *
         * Note that you can change the resolution altering the
         * REDIS_LRU_CLOCK_RESOLUTION define.
         */
        // 更新服务器的 LRU 时间
        updateLRUClock();
    
        /* Record the max memory used since the server was started. */
        // 记录服务器启动以来的内存最高峰
        if (zmalloc_used_memory() > server.stat_peak_memory)
            server.stat_peak_memory = zmalloc_used_memory();
    
        /* We received a SIGTERM, shutting down here in a safe way, as it is
         * not ok doing so inside the signal handler. */
        if (server.shutdown_asap) {
            // 保存数据库,清理服务器,并退出
            if (prepareForShutdown(0) == REDIS_OK) exit(0);
            redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        }
    
        /* Show some info about non-empty databases */
        // 记录非空数据库的信息
        run_with_period(5000) {
            for (j = 0; j < server.dbnum; j++) {
                long long size, used, vkeys;
    
                size = dictSlots(server.db[j].dict);
                used = dictSize(server.db[j].dict);
                vkeys = dictSize(server.db[j].expires);
                if (used || vkeys) {
                    redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                    /* dictPrintStats(server.dict); */
                }
            }
        }
    
        /* We don't want to resize the hash tables while a bacground saving
         * is in progress: the saving child is created using fork() that is
         * implemented with a copy-on-write semantic in most modern systems, so
         * if we resize the HT while there is the saving child at work actually
         * a lot of memory movements in the parent will cause a lot of pages
         * copied. */
        // 在保存 RDB 或者 AOF 重写时不进行 REHASH ,避免写时复制
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
            // 将哈希表的比率维持在 1:1 附近
            tryResizeHashTables();
            if (server.activerehashing) incrementallyRehash();
        }
    
        /* Show information about connected clients */
        // 显示已连接客户端的信息
        if (!server.sentinel_mode) {
            run_with_period(5000) {
                redisLog(REDIS_VERBOSE,
                    "%d clients connected (%d slaves), %zu bytes in use",
                    listLength(server.clients)-listLength(server.slaves),
                    listLength(server.slaves),
                    zmalloc_used_memory());
            }
        }
    
        /* We need to do a few operations on clients asynchronously. */
        clientsCron();
    
        /* Start a scheduled AOF rewrite if this was requested by the user while
         * a BGSAVE was in progress. */
        // 如果用户执行 BGREWRITEAOF 命令的话,在后台开始 AOF 重写
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
            server.aof_rewrite_scheduled)
        {
            rewriteAppendOnlyFileBackground();
        }
    
        /* Check if a background saving or AOF rewrite in progress terminated. */
        // 如果 BGSAVE 或者 BGREWRITEAOF 正在进行
        // 那么检查它们是否已经执行完毕
        if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
            int statloc;
            pid_t pid;
    
            if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
                int exitcode = WEXITSTATUS(statloc);
                int bysignal = 0;
                
                if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
    
                if (pid == server.rdb_child_pid) {
                    backgroundSaveDoneHandler(exitcode,bysignal);
                } else if (pid == server.aof_child_pid) {
                    backgroundRewriteDoneHandler(exitcode,bysignal);
                } else {
                    redisLog(REDIS_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
                // 如果 BGSAVE 和 BGREWRITEAOF 都已经完成,那么重新开始 REHASH
                updateDictResizePolicy();
            }
        } else {
            /* If there is not a background saving/rewrite in progress check if
             * we have to save/rewrite now */
             // 如果有需要,开始 RDB 文件的保存
             for (j = 0; j < server.saveparamslen; j++) {
                struct saveparam *sp = server.saveparams+j;
    
                if (server.dirty >= sp->changes &&
                    server.unixtime-server.lastsave > sp->seconds) {
                    redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                        sp->changes, sp->seconds);
                    rdbSaveBackground(server.rdb_filename);
                    break;
                }
             }
    
             /* Trigger an AOF rewrite if needed */
             // 如果有需要,开始 AOF 文件重写
             if (server.rdb_child_pid == -1 &&
                 server.aof_child_pid == -1 &&
                 server.aof_rewrite_perc &&
                 server.aof_current_size > server.aof_rewrite_min_size)
             {
                long long base = server.aof_rewrite_base_size ?
                                server.aof_rewrite_base_size : 1;
                long long growth = (server.aof_current_size*100/base) - 100;
                if (growth >= server.aof_rewrite_perc) {
                    redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                    rewriteAppendOnlyFileBackground();
                }
             }
        }
    
    
        /* If we postponed an AOF buffer flush, let's try to do it every time the
         * cron function is called. */
        // 如果有需要,保存 AOF 文件到硬盘
        if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    
        /* Expire a few keys per cycle, only if this is a master.
         * On slaves we wait for DEL operations synthesized by the master
         * in order to guarantee a strict consistency. */
        // 如果服务器是主节点的话,进行过期键删除
        // 如果服务器是附属节点的话,那么等待主节点发来的 DEL 命令
        if (server.masterhost == NULL) activeExpireCycle();
    
        /* Close clients that need to be closed asynchronous */
        // 关闭那些需要异步删除的客户端
        freeClientsInAsyncFreeQueue();
    
        /* Replication cron function -- used to reconnect to master and
         * to detect transfer failures. */
        // 进行定期同步
        run_with_period(1000) replicationCron();
    
        /* Run the Redis Cluster cron. */
        // 运行集群定期任务
        run_with_period(1000) {
            if (server.cluster_enabled) clusterCron();
        }
    
        /* Run the Sentinel timer if we are in sentinel mode. */
        // 运行监视器计时器
        run_with_period(100) {
            if (server.sentinel_mode) sentinelTimer();
        }
    
        /* Cleanup expired MIGRATE cached sockets. */
        run_with_period(1000) {
            migrateCloseTimedoutSockets();
        }
    
        server.cronloops++;
        return 1000/REDIS_HZ;
    }
    

    RDB文件结构

    • 一个 RDB 文件可以分为以下几个部分:

    +-------+-------------+-----------+-----------------+-----+-----------+
    | REDIS | RDB-VERSION | SELECT-DB | KEY-VALUE-PAIRS | EOF | CHECK-SUM |
    +-------+-------------+-----------+-----------------+-----+-----------+
    |<-------- DB-DATA ---------->|
    以下的几个小节将分别对这几个部分的保存和读入规则进行介绍。

    • REDIS
      文件的最开头保存着 REDIS 五个字符,标识着一个 RDB 文件的开始。
      在读入文件的时候,程序可以通过检查一个文件的前五个字节,来快速地判断该文件是否有可能是 RDB 文件。

    • RDB-VERSION
      一个四字节长的以字符表示的整数,记录了该文件所使用的 RDB 版本号。
      目前的 RDB 文件版本为 0006 。
      因为不同版本的 RDB 文件互不兼容,所以在读入程序时,需要根据版本来选择不同的读入方式。

    • DB-DATA
      这个部分在一个 RDB 文件中会出现任意多次,每个 DB-DATA 部分保存着服务器上一个非空数据库的所有数据。

    • SELECT-DB
      这域保存着跟在后面的键值对所属的数据库号码。
      在读入 RDB 文件时,程序会根据这个域的值来切换数据库,确保数据被还原到正确的数据库上。

    • KEY-VALUE-PAIRS
      因为空的数据库不会被保存到 RDB 文件,所以这个部分至少会包含一个键值对的数据。

    • 每个键值对的数据使用以下结构来保存:

    +----------------------+---------------+-----+-------+
    | OPTIONAL-EXPIRE-TIME | TYPE-OF-VALUE | KEY | VALUE |
    +----------------------+---------------+-----+-------+

    • OPTIONAL-EXPIRE-TIME 域是可选的,如果键没有设置过期时间,那么这个域就不会出现;反之,如果这个域出现的话,那么它记录着键的过期时间,在当前版本的 RDB 中,过期时间是一个以毫秒为单位的 UNIX 时间戳。
    • KEY 域保存着键,格式和 REDIS_ENCODING_RAW 编码的字符串对象一样(见下文)。
    • TYPE-OF-VALUE 域记录着 VALUE 域的值所使用的编码,根据这个域的指示,程序会使用不同的方式来保存和读取 VALUE 的值。

    保存 VALUE 的详细格式如下:
    REDIS_ENCODING_INT 编码的 REDIS_STRING 类型对象:

    • 如果值可以表示为 8 位、 16 位或 32 位有符号整数,那么直接以整数类型的形式来保存它们:

    +---------+
    | integer |
    +---------+
    比如说,整数 8 可以用 8 位序列 00001000 保存。

    当读入这类值时,程序按指定的长度读入字节数据,然后将数据转换回整数类型。
    另一方面,如果值不能被表示为最高 32 位的有符号整数,那么说明这是一个 long long 类型的值,在 RDB 文件中,这种类型的值以字符序列的形式保存。

    • 一个字符序列由两部分组成:

    +-----+---------+
    | LEN | CONTENT |
    +-----+---------+
    其中, CONTENT 域保存了字符内容,而 LEN 则保存了以字节为单位的字符长度。

    当进行载入时,读入器先读入 LEN ,创建一个长度等于 LEN 的字符串对象,然后再从文件中读取 LEN 字节数据,并将这些数据设置为字符串对象的值。

    • REDIS_ENCODING_RAW 编码的 REDIS_STRING 类型值有三种保存方式:
      如果值可以表示为 8 位、 16 位或 32 位长的有符号整数,那么用整数类型的形式来保存它们。
      如果字符串长度大于 20 ,并且服务器开启了 LZF 压缩功能 ,那么对字符串进行压缩,并保存压缩之后的数据。

    • 经过 LZF 压缩的字符串会被保存为以下结构:

    +----------+----------------+--------------------+
    | LZF-FLAG | COMPRESSED-LEN | COMPRESSED-CONTENT |
    +----------+----------------+--------------------+
    LZF-FLAG 告知读入器,后面跟着的是被 LZF 算法压缩过的数据。
    COMPRESSED-CONTENT 是被压缩后的数据, COMPRESSED-LEN 则是该数据的字节长度。
    在其他情况下,程序直接以普通字节序列的方式来保存字符串。比如说,对于一个长度为 20 字节的字符串,需要使用 20 字节的空间来保存它。

    • 这种字符串被保存为以下结构:

    +-----+---------+
    | LEN | CONTENT |
    +-----+---------+
    LEN 为字符串的字节长度, CONTENT 为字符串。

    当进行载入时,读入器先检测字符串保存的方式,再根据不同的保存方式,用不同的方法取出内容,并将内容保存到新建的字符串对象当中。

    • REDIS_ENCODING_LINKEDLIST 编码的 REDIS_LIST 类型值保存为以下结构:

    +-----------+--------------+--------------+-----+--------------+
    | NODE-SIZE | NODE-VALUE-1 | NODE-VALUE-2 | ... | NODE-VALUE-N |
    +-----------+--------------+--------------+-----+--------------+
    其中 NODE-SIZE 保存链表节点数量,后面跟着 NODE-SIZE 个节点值。节点值的保存方式和字符串的保存方式一样。

    • 当进行载入时,读入器读取节点的数量,创建一个新的链表,然后一直执行以下步骤,直到指定节点数量满足为止:
      1.读取字符串表示的节点值
      2.将包含节点值的新节点添加到链表中
    • REDIS_ENCODING_HT 编码的 REDIS_SET 类型值保存为以下结构:

    +----------+-----------+-----------+-----+-----------+
    | SET-SIZE | ELEMENT-1 | ELEMENT-2 | ... | ELEMENT-N |
    +----------+-----------+-----------+-----+-----------+
    SET-SIZE 记录了集合元素的数量,后面跟着多个元素值。元素值的保存方式和字符串的保存方式一样。

    载入时,读入器先读入集合元素的数量 SET-SIZE ,再连续读入 SET-SIZE 个字符串,并将这些字符串作为新元素添加至新创建的集合。

    • REDIS_ENCODING_SKIPLIST 编码的 REDIS_ZSET 类型值保存为以下结构:

    +--------------+-------+---------+-------+---------+-----+-------+---------+
    | ELEMENT-SIZE | MEB-1 | SCORE-1 | MEB-2 | SCORE-2 | ... | MEB-N | SCORE-N |
    +--------------+-------+---------+-------+---------+-----+-------+---------+
    其中 ELEMENT-SIZE 为有序集元素的数量, MEB-i 为第 i 个有序集元素的成员, SCORE-i 为第 i 个有序集元素的分值。

    • 当进行载入时,读入器读取有序集元素数量,创建一个新的有序集,然后一直执行以下步骤,直到指定元素数量满足为止:
      1.读入字符串形式保存的成员 member
      2.读入字符串形式保存的分值 score ,并将它转换为浮点数
      3.添加 member 为成员、 score 为分值的新元素到有序集

    • REDIS_ENCODING_HT 编码的 REDIS_HASH 类型值保存为以下结构:

    +-----------+-------+---------+-------+---------+-----+-------+---------+
    | HASH-SIZE | KEY-1 | VALUE-1 | KEY-2 | VALUE-2 | ... | KEY-N | VALUE-N |
    +-----------+-------+---------+-------+---------+-----+-------+---------+
    HASH-SIZE 是哈希表包含的键值对的数量, KEY-i 和 VALUE-i 分别是哈希表的键和值。

    • 载入时,程序先创建一个新的哈希表,然后读入 HASH-SIZE ,再执行以下步骤 HASH-SIZE 次:
      1.读入一个字符串
      2.再读入另一个字符串
      3.将第一个读入的字符串作为键,第二个读入的字符串作为值,插入到新建立的哈希中。

    • REDIS_LIST 类型、 REDIS_HASH 类型和 REDIS_ZSET 类型都使用了 REDIS_ENCODING_ZIPLIST 编码, ziplist 在 RDB 中的保存方式如下:

    +-----+---------+
    | LEN | ZIPLIST |
    +-----+---------+
    载入时,读入器先读入 ziplist 的字节长,再根据该字节长读入数据,最后将数据还原成一个 ziplist 。

    • REDIS_ENCODING_INTSET 编码的 REDIS_SET 类型值保存为以下结构:

    +-----+--------+
    | LEN | INTSET |
    +-----+--------+
    载入时,读入器先读入 intset 的字节长度,再根据长度读入数据,最后将数据还原成 intset 。

    • EOF
      标志着数据库内容的结尾(不是文件的结尾),值为 rdb.h/EDIS_RDB_OPCODE_EOF (255)。

    • CHECK-SUM
      RDB 文件所有内容的校验和,一个 uint_64t 类型值。
      REDIS 在写入 RDB 文件时将校验和保存在 RDB 文件的末尾,当读取时,根据它的值对内容进行校验。
      如果这个域的值为 0 ,那么表示 Redis 关闭了校验和功能。

    相关文章

      网友评论

        本文标题:「Redis源码解读」—持久化(一)RDB

        本文链接:https://www.haomeiwen.com/subject/jzfacctx.html