美文网首页redis
Redis源码分析--AOF文件全量重写源码阅读

Redis源码分析--AOF文件全量重写源码阅读

作者: 阿飞的博客 | 来源:发表于2017-09-26 18:28 被阅读457次

    重要说明,在看这篇文章之前,最好先通过 剖析Redis协议 了解Redis协议,AOF文件全量重写就是根据Redis协议生成的;

    AOF文件什么时候完全重写:

    • 1 AOF文件超过64M且增长一定比例(最后一次AOF文件重写后增长了aof_rewrite_perc,默认是100%,在redis.h中有定义:REDIS_AOF_REWRITE_PERC,可以通过config get/set auto-aof-rewrite-percentage热修改)
    • 2 有AOF重写的调度任务(例如执行BGREWRITEAOF命令)

    这个方法的注释说明了后台AOF重写是如何工作的--主要是全量重新AOF文件业务逻辑:

    /* This is how rewriting of the append only file in background works:
     *
     * 1) The user calls BGREWRITEAOF
     * 2) Redis calls this function, that forks():
     *    2a) the child rewrite the append only file in a temp file.
     *    2b) the parent accumulates differences in server.aof_rewrite_buf.
     * 3) When the child finished '2a' exists.
     * 4) The parent will trap the exit code, if it's OK, will append the
     *    data accumulated into server.aof_rewrite_buf into the temp file, and
     *    finally will rename(2) the temp file in the actual file name.
     *    The the new file is reopened as the new append only file. Profit!
     */
    int rewriteAppendOnlyFileBackground(void) {
        pid_t childpid;
        long long start;
    
        // 如果已经有AOF重写任务,那么退出;
        if (server.aof_child_pid != -1) return REDIS_ERR;
        start = ustime();
    
        // 调用fork(),如果返回值childpid==0那么表示当前处于fork的子进程中;
        if ((childpid = fork()) == 0) {
            char tmpfile[256];
    
            /* Child */
            closeListeningSockets(0);
            redisSetProcTitle("redis-aof-rewrite");
            // 如果getpid()的结果为1976,即当前进程id为1976,那么tmpfile=‘temp-rewriteaof-bg-1976.aof’,即AOF文件重写临时文件名
            snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
            // 调用rewriteAppendOnlyFile重写aof文件到tmpfile中[后面会解读];
            if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
                size_t private_dirty = zmalloc_get_private_dirty();
    
                if (private_dirty) {
                    redisLog(REDIS_NOTICE,
                        "AOF rewrite: %zu MB of memory used by copy-on-write",
                        private_dirty/(1024*1024));
                }
                exitFromChild(0);
            } else {
                exitFromChild(1);
            }
        } else {
            // 调用fork(),如果返回值childpid!=0那么表示当前处于父进程中;
            /* Parent */
            server.stat_fork_time = ustime()-start;
            server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
            latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
            if (childpid == -1) {
                redisLog(REDIS_WARNING,
                    "Can't rewrite append only file in background: fork: %s",
                    strerror(errno));
                return REDIS_ERR;
            }
            redisLog(REDIS_NOTICE,
                "Background append only file rewriting started by pid %d",childpid);
            server.aof_rewrite_scheduled = 0;
            server.aof_rewrite_time_start = time(NULL);
            server.aof_child_pid = childpid;
            updateDictResizePolicy();
            /* We set appendseldb to -1 in order to force the next call to the
             * feedAppendOnlyFile() to issue a SELECT command, so the differences
             * accumulated by the parent into server.aof_rewrite_buf will start
             * with a SELECT statement and it will be safe to merge. */
            server.aof_selected_db = -1;
            replicationScriptCacheFlush();
            return REDIS_OK;
        }
        return REDIS_OK; /* unreached */
    }
    

    调用rewriteAppendOnlyFile重写AOF文件(增量重写AOF文件,重新生成AOF文件):

    /* Write a sequence of commands able to fully rebuild the dataset into
     * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
     *
     * In order to minimize the number of commands needed in the rewritten
     * log Redis uses variadic commands when possible, such as RPUSH, SADD
     * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
     * are inserted using a single command. */
    int rewriteAppendOnlyFile(char *filename) {
        dictIterator *di = NULL;
        dictEntry *de;
        rio aof;
        FILE *fp;
        char tmpfile[256];
        int j;
        long long now = mstime();
    
        /* Note that we have to use a different temp name here compared to the
         * one used by rewriteAppendOnlyFileBackground() function. */
        snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
        fp = fopen(tmpfile,"w");
        if (!fp) {
            redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
            return REDIS_ERR;
        }
    
        rioInitWithFile(&aof,fp);
    
        // 如果开启了AOF重写增量模式--即配置appendonly yes然后执行set,lpush等引起内存数据变化的命令;
        if (server.aof_rewrite_incremental_fsync)
            rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
        // 遍历redis中所有db重新生成AOF文件
        for (j = 0; j < server.dbnum; j++) {
            //写入AOF文件中的第一行内容就是selectcmd,即*2\r\n$6\r\nSELECT\r\n,这个内容是根据redis协议定义的:
            // *2
            // $6
            // SELECT
            // *2 表示这条命名有两个参数(SELECT dbnum)
            // $6 表示接下来参数的长度是6
            // SELECT表示长度是6的参数,后面还会写入dbnum;
            char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
            redisDb *db = server.db+j;
            // redis中每个db里保存key的数据结构是一个dict;
            dict *d = db->dict;
            // 如果遍历当前db的dict(保存所有key的数据结构)是空,那么遍历下一次db
            if (dictSize(d) == 0) continue;
            // 如果遍历当前db的dict有值,那么迭代这个dict;
            di = dictGetSafeIterator(d);
            if (!di) {
                fclose(fp);
                return REDIS_ERR;
            }
    
            // 把selectcmd这个char[]以及当前遍历的db编号即j写入aof文件中(接着写在上面的SELECT之后);
            /* SELECT the new DB */
            if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
            if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
    
            // 迭代dictIterator *di,迭代过程中得到的de就是一个dictEntry  :
            /* Iterate this DB writing every entry */
            while((de = dictNext(di)) != NULL) {
                sds keystr;
                robj key, *o;
                long long expiretime;
    
                // 根据dictEntry得到key和value,value是一个redisObject类型指针;
                keystr = dictGetKey(de);
                o = dictGetVal(de);
                initStaticStringObject(key,keystr);
    
                // 从存放所有设置了过期时间的dict中查询这个key是否设置了过期时间;
                expiretime = getExpire(db,&key);
    
                // 如果已经过期,那么跳过,不保存到aof文件中
                /* If this key is already expired skip it */
                if (expiretime != -1 && expiretime < now) continue;
    
                // 接下来根据值的类型不同处理方式也不同;
                /* Save the key and associated value */
    
                // 如果当前key的值的类型是REDIS_STRING,即set命令生成的,假设当前遍历的是set username afei,那么写入aof文件大概内容如下(\r\n就是window格式的换行符):
                // *3
                // $3
                // SET
                // $8
                // username
                // $4
                // afei
                // 其他的list,set,zset,hash处理类似;
                if (o->type == REDIS_STRING) {
                    /* Emit a SET command */
                    char cmd[]="*3\r\n$3\r\nSET\r\n";
                    if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                    /* Key and value */
                    if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                    if (rioWriteBulkObject(&aof,o) == 0) goto werr;
                } else if (o->type == REDIS_LIST) {
                    if (rewriteListObject(&aof,&key,o) == 0) goto werr;
                } else if (o->type == REDIS_SET) {
                    if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
                } else if (o->type == REDIS_ZSET) {
                    if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
                } else if (o->type == REDIS_HASH) {
                    if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
                } else {
                    redisPanic("Unknown object type");
                }
                /* Save the expire time */
                // 如果key有过期属性,那么还需要单独保存过期属性到aof文件中,格式大概如下:
                // *3
                // $9
                // PEXPIREAT
                // $8
                // username
                // $13
                // 1506405235055
                if (expiretime != -1) {
                    char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                    if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                    if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                    if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
                }
            }
            dictReleaseIterator(di);
            di = NULL;
        }
    
        /* Make sure data will not remain on the OS's output buffers */
        if (fflush(fp) == EOF) goto werr;
        if (fsync(fileno(fp)) == -1) goto werr;
        if (fclose(fp) == EOF) goto werr;
    
        // 最后重命名这个AOF文件;用rename能保证重命名的原子性;
        /* Use RENAME to make sure the DB file is changed atomically only
         * if the generate DB file is ok. */
        if (rename(tmpfile,filename) == -1) {
            redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
            unlink(tmpfile);
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
        return REDIS_OK;
    
    werr:
        redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
        fclose(fp);
        unlink(tmpfile);
        if (di) dictReleaseIterator(di);
        return REDIS_ERR;
    }
    

    相关文章

      网友评论

        本文标题:Redis源码分析--AOF文件全量重写源码阅读

        本文链接:https://www.haomeiwen.com/subject/nvheextx.html