美文网首页工作生活
「Redis源码解读」—持久化(二)AOF

「Redis源码解读」—持久化(二)AOF

作者: wh4763 | 来源:发表于2019-07-04 00:45 被阅读0次

知识点

  • AOF文件通过保存所有修改数据库的写命令请求来记录服务器的数据库状态
  • AOF文件中的所有命令都以Redis命令请求协议的格式保存
  • 命令请求会先保存到AOF缓冲区里面,之后再定期写入并同步到AOF文件
  • appendfsync选项的不同值对AOF持久化功能的安全性以及Redis服务器的性能有很大的影响
  • 服务器只要载入并重新执行保存在AOF文件中的命令,就可以还原数据库本来的状态
  • AOF重写可以产生一个新的AOF文件,这个新的AOF文件和原有的AOF文件所保存的数据库状态一样,但体积更小
  • AOF重写是一个有歧义的名字,该功能是通过读取数据库中的键值对来实现的,程序无序对现有AOF文件进行任何读入、分析或者写入操作
  • 在执行BFREWRITEAOF命令时,Redis服务器会维护一个AOF重写缓冲区,该缓冲区会在子进程创建新AOF文件期间,记录服务器执行的所有命令。当子进程完成创建新AOF文件的工作之后,Redis会将重写缓冲区中的所有内容追加到新AOF文件的末尾,使得新旧两个AOF文件所保存数据库状态一致。最后,Redis用新的AOF文件替换旧的AOF文件,以此来完成AOF文件重写操作。

AOF持久化


  • 除了rdb持久化功能之外,redis还提供了aof(append only file)持久化功能。与rdb不同,aof持久化
    是通过保存redis服务器所执行的写命令来记录数据库的状态。
  • AOF持久化的实现可以分为命令追加、文件写入和文件同步三个步骤。

命令追加

当AOF持久化功能处于打开状态时,服务器在执行完一个写命令之后,会以协议格式将被执行的写命
令追加到服务器状态的aof_buf缓冲区的末尾

/*-----------------------------------------------------------------------------
 * Global server state
 *----------------------------------------------------------------------------*/

struct redisServer {
    //...
    sds aof_buf;      /* AOF buffer, written before entering the event loop */
    //...
}

服务器执行完写命令,调用propagate函数进行命令追加,函数如下:

/* Propagate the specified command (in the context of the specified database id)
 * to AOF and Slaves.
 *
 * 传播给定命令到 AOF 或附属节点
 *
 * flags are an xor between:
 * + REDIS_PROPAGATE_NONE (no propagation of command at all)
 * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
 * + REDIS_PROPAGATE_REPL (propagate into the replication link)
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

/*
 * 将给定命令追加到 AOF 文件/缓存中 
 */
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targetting is not the same as the last command
     * we appendend. To issue a SELECT command is needed. */
    // 当前 db 不是指定的 aof db,
    // 通过创建 SELECT 命令来切换数据库
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        // 让 AOF 文件切换 DB
        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);

        // 程序切换 DB
        server.aof_selected_db = dictid;
    }

    // 将 EXPIRE / PEXPIRE / EXPIREAT 命令翻译为 PEXPIREAT 命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);

    // 将 SETEX / PSETEX 命令翻译为 SET 和 PEXPIREAT 命令
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);

    // 其他命令直接追加到 buf 末尾
    } else {
        /* All the other commands don't need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    // 将 buf 追加到服务器的 aof_buf 末尾
    // 下次 AOF 写入执行时,这些数据就会被写入
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    // 如果 AOF 重写正在执行,那么也将新 buf 追加到 AOF 重写缓存中
    // 等 AOF 重写完之前的数据之后,新输入的命令也会追加到新 AOF 文件中。
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

文件写入和同步

  • redis的服务器进程是一个事件循环,文件事件负责处理客户端的命令请求,而时间事件负责执行serverCron
    函数这样的定时运行的函数。在处理文件事件执行写命令,使得命令被追加到aof_buf中,然后在处理时间事件执
    行serverCron函数会调用flushAppendOnlyFile函数进行文件的写入和同步。
  • flushAppendOnlyFile函数的行为由服务器配置的appendfsync选项的值决定。
    1.always:将aof_buf中的所有内容写入并同步到aof文件。
    2.everysec:将aof_buf中的所有内容写入到aof文件,如果上次同步的时间距离现在超过1s,那么对aof文件进行同步,同步操作由一个线程专门负责执行。
    3.no:将aof_buf中的所有内容写入到aof文件,但不对aof文件同步,同步由操作系统执行。
  • serverCron函数如下:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

    /* Software watchdog: deliver the SIGALRM that will reach the signal
     * handler if we don't return here fast enough. */
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

    /* We take a cached value of the unix time in the global state because
     * with virtual memory and aging there is to store the current time
     * in objects at every object access, and accuracy is not needed.
     * To access a global var is faster than calling time(NULL) */
    // 将 UNIX 时间保存在服务器状态中,减少对 time(NULL) 的调用,加速。
    server.unixtime = time(NULL);

    // 对执行命令的时间进行采样分析
    run_with_period(100) trackOperationsPerSecond();

    /* We have just 22 bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
     * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
     *
     * Note that even if this will wrap after 1.5 years it's not a problem,
     * everything will still work but just some object will appear younger
     * to Redis. But for this to happen a given object should never be touched
     * for 1.5 years.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define.
     */
    // 更新服务器的 LRU 时间
    updateLRUClock();

    /* Record the max memory used since the server was started. */
    // 记录服务器启动以来的内存最高峰
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();

    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    if (server.shutdown_asap) {
        // 保存数据库,清理服务器,并退出
        if (prepareForShutdown(0) == REDIS_OK) exit(0);
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
    }

    /* Show some info about non-empty databases */
    // 记录非空数据库的信息
    run_with_period(5000) {
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;

            size = dictSlots(server.db[j].dict);
            used = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (used || vkeys) {
                redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                /* dictPrintStats(server.dict); */
            }
        }
    }

    /* We don't want to resize the hash tables while a bacground saving
     * is in progress: the saving child is created using fork() that is
     * implemented with a copy-on-write semantic in most modern systems, so
     * if we resize the HT while there is the saving child at work actually
     * a lot of memory movements in the parent will cause a lot of pages
     * copied. */
    // 在保存 RDB 或者 AOF 重写时不进行 REHASH ,避免写时复制
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        // 将哈希表的比率维持在 1:1 附近
        tryResizeHashTables();
        if (server.activerehashing) incrementallyRehash();
    }

    /* Show information about connected clients */
    // 显示已连接客户端的信息
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            redisLog(REDIS_VERBOSE,
                "%d clients connected (%d slaves), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }

    /* We need to do a few operations on clients asynchronously. */
    clientsCron();

    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    // 如果用户执行 BGREWRITEAOF 命令的话,在后台开始 AOF 重写
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    /* Check if a background saving or AOF rewrite in progress terminated. */
    // 如果 BGSAVE 或者 BGREWRITEAOF 正在进行
    // 那么检查它们是否已经执行完毕
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;
            
            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            // 如果 BGSAVE 和 BGREWRITEAOF 都已经完成,那么重新开始 REHASH
            updateDictResizePolicy();
        }
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now */
         // 如果有需要,开始 RDB 文件的保存
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds) {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, sp->seconds);
                rdbSaveBackground(server.rdb_filename);
                break;
            }
         }

         /* Trigger an AOF rewrite if needed */
         // 如果有需要,开始 AOF 文件重写
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }
    }


    /* If we postponed an AOF buffer flush, let's try to do it every time the
     * cron function is called. */
    // 如果有需要,保存 AOF 文件到硬盘
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* Expire a few keys per cycle, only if this is a master.
     * On slaves we wait for DEL operations synthesized by the master
     * in order to guarantee a strict consistency. */
    // 如果服务器是主节点的话,进行过期键删除
    // 如果服务器是附属节点的话,那么等待主节点发来的 DEL 命令
    if (server.masterhost == NULL) activeExpireCycle();

    /* Close clients that need to be closed asynchronous */
    // 关闭那些需要异步删除的客户端
    freeClientsInAsyncFreeQueue();

    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    // 进行定期同步
    run_with_period(1000) replicationCron();

    /* Run the Redis Cluster cron. */
    // 运行集群定期任务
    run_with_period(1000) {
        if (server.cluster_enabled) clusterCron();
    }

    /* Run the Sentinel timer if we are in sentinel mode. */
    // 运行监视器计时器
    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }

    /* Cleanup expired MIGRATE cached sockets. */
    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    server.cronloops++;
    return 1000/REDIS_HZ;
}
  • 为了提高文件的写入效率,在现代操作系统中,当用户调用write函数,将一些数据写入到文件的时候,操作系统通常会将写入数据暂时保存在一个内存缓冲区里面,等到缓冲区到空间被填满、或者超过了指定的时限之后,才真正地将缓冲区中的数据写入到磁盘里面
  • 这种做法虽然提高了效率,但是也为写入数据带来了安全问题,因为如果计算机发生停机,那么保存在内存缓冲区里面到写入数据将丢失。
  • 为此,系统提供了fsync和fdatasync两个同步函数,它们可以强制让操作系统立即将缓冲区中的数据写入到磁盘里面,从而确保写入的数据安全性。

AOF文件的载入和数据还原

  • 服务器读入并重新执行一遍aof文件里面保存的写命令,就可以还原服务器关闭之前的数据库状态。

  • Redis 读取 AOF 文件并还原数据库的详细步骤如下:
    1.创建一个不带网络连接的伪客户端(fake client)。
    2.读取 AOF 所保存的文本,并根据内容还原出命令、命令的参数以及命令的个数。
    3.根据命令、命令的参数和命令的个数,使用伪客户端执行该命令。
    4.执行 2 和 3 ,直到 AOF 文件中的所有命令执行完毕。
    完成第 4 步之后, AOF 文件所保存的数据库就会被完整地还原出来。

  • 注意, 因为 Redis 的命令只能在客户端的上下文中被执行, 而 AOF 还原时所使用的命令来自于 AOF 文件, 而不是网络, 所以程序使用了一个没有网络连接的伪客户端来执行命令。 伪客户端执行命令的效果, 和带网络连接的客户端执行命令的效果, 完全一样。

  • 服务器读取aof文件并还原数据库状态的流程:



    载入数据函数如下:

/*
 * 载入 AOF 文件
 */
int loadAppendOnlyFile(char *filename) {
    struct redisClient *fakeClient;
    FILE *fp = fopen(filename,"r");
    struct redis_stat sb;
    int old_aof_state = server.aof_state;
    long loops = 0;

    // 空文件
    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        server.aof_current_size = 0;
        fclose(fp);
        return REDIS_ERR;
    }

    // 打开文件失败
    if (fp == NULL) {
        redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
        exit(1);
    }

    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
     * to the same file we're about to read. */
    // 临时关闭 AOF 
    server.aof_state = REDIS_AOF_OFF;

    // 创建伪终端
    fakeClient = createFakeClient();
    // 定义于 rdb.c ,更新服务器的载入状态
    startLoading(fp);

    while(1) {
        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;

        /* Serve the clients from time to time */
        // 有间隔地处理外部请求
        if (!(loops++ % 1000)) {
            // rbd.c/loadingProgress
            loadingProgress(ftello(fp));
            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
        }

        // 出错或 EOF
        if (fgets(buf,sizeof(buf),fp) == NULL) {
            if (feof(fp))
                break;
            else
                goto readerr;
        }

        // 读入命令和命令参数
        if (buf[0] != '*') goto fmterr;
        argc = atoi(buf+1);
        if (argc < 1) goto fmterr;

        argv = zmalloc(sizeof(robj*)*argc);
        for (j = 0; j < argc; j++) {
            if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
            if (buf[0] != '$') goto fmterr;
            len = strtol(buf+1,NULL,10);
            argsds = sdsnewlen(NULL,len);
            if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
            argv[j] = createObject(REDIS_STRING,argsds);
            if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
        }

        /* Command lookup */
        // 查找命令
        cmd = lookupCommand(argv[0]->ptr);
        if (!cmd) {
            redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
            exit(1);
        }
        /* Run the command in the context of a fake client */
        // 在伪终端上下文里执行命令
        fakeClient->argc = argc;
        fakeClient->argv = argv;
        cmd->proc(fakeClient);

        /* The fake client should not have a reply */
        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
        /* The fake client should never get blocked */
        redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);

        /* Clean up. Command code may have changed argv/argc so we use the
         * argv/argc of the client instead of the local variables. */
        for (j = 0; j < fakeClient->argc; j++)
            decrRefCount(fakeClient->argv[j]);
        zfree(fakeClient->argv);
    }

    /* This point can only be reached when EOF is reached without errors.
     * If the client is in the middle of a MULTI/EXEC, log error and quit. */
    if (fakeClient->flags & REDIS_MULTI) goto readerr;

    // 清理资源,并还原 flag
    fclose(fp);
    freeFakeClient(fakeClient);
    server.aof_state = old_aof_state;
    stopLoading();
    aofUpdateCurrentSize();
    server.aof_rewrite_base_size = server.aof_current_size;
    return REDIS_OK;

readerr:
    if (feof(fp)) {
        redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
    } else {
        redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
    }
    exit(1);
fmterr:
    redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
    exit(1);
}

AOF重写

  • 由于aof是通过不断追加写命令来记录数据库状态,所以服务器执行比较久之后,aof文件中的内容会越来越多,磁盘占有量越来越大,同时也是使通过过aof文件还原数据库的需要的时间也变得很久。所以就需要通过读取服务器当前的数据库状态来重写新的aof文件。
  • 虽然Redis将生成新AOF文件替换旧AOF文件的功能命名为“AOF文件重写”,但实际上,AOF文件重写并不需要对现有对AOF文件进行任何读取、分析或者写入操作,这个功能是通过读取Redis当前的数据库状态实现的。
    举个例子, 如果服务器执行了以下命令:

RPUSH list 1 2 3 4 // [1, 2, 3, 4]

RPOP list // [1, 2, 3]

LPOP list // [2, 3]

LPUSH list 1 // [1, 2, 3]
那么光是记录 list 键的状态, AOF 文件就需要保存四条命令。
另一方面, 有些被频繁操作的键, 对它们所调用的命令可能有成百上千、甚至上万条, 如果这样被频繁操作的键有很多的话, AOF 文件的体积就会急速膨胀, 对 Redis甚至整个系统的造成影响。
为了解决以上的问题,Redis 需要对 AOF 文件进行重写(rewrite):创建一个新的 AOF 文件来代替原有的 AOF 文件, 新 AOF 文件和原有 AOF 文件保存的数据库状态完全一样, 但新 AOF 文件的体积小于等于原有 AOF 文件的体积

  • 再考虑这样一个例子, 如果服务器对集合键 animal 执行了以下命令:

SADD animal cat // {cat}

SADD animal dog panda tiger // {cat, dog, panda, tiger}

SREM animal cat // {dog, panda, tiger}

SADD animal cat lion // {cat, lion, dog, panda, tiger}
那么使用一条 SADD animal cat lion dog panda tiger 命令, 就可以还原 animal 集合的状态, 这比之前的四条命令调用要大大减少。
除了列表和集合之外, 字符串、有序集、哈希表等键也可以用类似的方法来保存状态, 并且保存这些状态所使用的命令数量, 比起之前建立这些键的状态所使用命令的数量要大大减少。

  • 根据键的类型, 使用适当的写入命令来重现键的当前值, 这就是 AOF 重写的实现原理
/* Write a sequence of commands able to fully rebuild the dataset into
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *
 * 写一串足以还原数据集的命令到给定文件里。
 * 被 REWRITEAOF 和 BGREWRITEAOF 所使用。
 *
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
 * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
 * are inserted using a single command. 
 *
 * 为了减少重建数据集所需命令的数量,
 * 在可能时,Redis 会使用可变参数命令,比如 RPUSH 、 SADD 和 ZADD 。
 * 不过这些命令每次最多添加的元素不会超过 REDIS_AOF_REWRITE_ITEMS_PER_CMD 。
 *
 * 重写失败返回 REDIS_ERR ,成功返回 REDIS_OK 。
 */
int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }

    // 初始化文件流
    rioInitWithFile(&aof,fp);
    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* SELECT the new DB */
        // 切换到合适的数据库上
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */
        // 遍历数据库的所有 key-value 对
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* Save the key and associated value */
            // 保存 key 和 value
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
            } else if (o->type == REDIS_LIST) {
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_SET) {
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_ZSET) {
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_HASH) {
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
            } else {
                redisPanic("Unknown object type");
            }
            /* Save the expire time */
            // 保存可能有的过期时间
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";

                /* If this key is already expired skip it 
                 *
                 * 如果键已经过期,那么不写入它的过期时间
                 */
                if (expiretime < now) continue;

                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
            }
        }
        dictReleaseIterator(di);
    }

    /* Make sure data will not remain on the OS's output buffers */
    // 重新文件流
    fflush(fp);
    // sync
    aof_fsync(fileno(fp));
    // 关闭
    fclose(fp);

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    // 通过更名,用重写后的新 AOF 文件代替旧的 AOF 文件
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

AOF 后台重写
上一节展示的 AOF 重写程序可以很好地完成创建一个新 AOF 文件的任务, 但是, 在执行这个程序的时候, 调用者线程会被阻塞。

很明显, 作为一种辅佐性的维护手段, Redis 不希望 AOF 重写造成服务器无法处理请求, 所以 Redis 决定将 AOF 重写程序放到(后台)子进程里执行, 这样处理的最大好处是:

子进程进行 AOF 重写期间,主进程可以继续处理命令请求。
子进程带有主进程的数据副本,使用子进程而不是线程,可以在避免锁的情况下,保证数据的安全性。
不过, 使用子进程也有一个问题需要解决: 因为子进程在进行 AOF 重写期间, 主进程还需要继续处理命令, 而新的命令可能对现有的数据进行修改, 这会让当前数据库的数据和重写后的 AOF 文件中的数据不一致。

为了解决这个问题, Redis 增加了一个 AOF 重写缓存, 这个缓存在 fork 出子进程之后开始启用, Redis 主进程在接到新的写命令之后, 除了会将这个写命令的协议内容追加到现有的 AOF 文件之外, 还会追加到这个缓存中:


  • 换言之, 当子进程在执行 AOF 重写时, 主进程需要执行以下三个工作:
    1.处理命令请求。
    2.将写命令追加到现有的 AOF 文件中。
    3.将写命令追加到 AOF 重写缓存中。
  • 这样一来可以保证:
  • 现有的 AOF 功能会继续执行,即使在 AOF 重写期间发生停机,也不会有任何数据丢失。
  • 所有对数据库进行修改的命令都会被记录到 AOF 重写缓存中。
  • 当子进程完成 AOF 重写之后, 它会向父进程发送一个完成信号, 父进程在接到完成信号之后, 会调用一个信号处理函数, 并完成以下工作:
    1.将 AOF 重写缓存中的内容全部写入到新 AOF 文件中。
    2.对新的 AOF 文件进行改名,覆盖原有的 AOF 文件。

当步骤 1 执行完毕之后, 现有 AOF 文件、新 AOF 文件和数据库三者的状态就完全一致了。
当步骤 2 执行完毕之后, 程序就完成了新旧两个 AOF 文件的交替。

这个信号处理函数执行完毕之后, 主进程就可以继续像往常一样接受命令请求了。 在整个 AOF 后台重写过程中, 只有最后的写入缓存和改名操作会造成主进程阻塞, 在其他时候, AOF 后台重写都不会对主进程造成阻塞, 这将 AOF 重写对性能造成的影响降到了最低。
以上就是 AOF 后台重写, 也即是 BGREWRITEAOF 命令的工作原理。

  • AOF 后台重写的触发条件

AOF 重写可以由用户通过调用 BGREWRITEAOF 手动触发。
另外, 服务器在 AOF 功能开启的情况下, 会维持以下三个变量:

记录当前 AOF 文件大小的变量 aof_current_size 。
记录最后一次 AOF 重写之后, AOF 文件大小的变量 aof_rewrite_base_size 。
增长百分比变量 aof_rewrite_perc 。
每次当 serverCron 函数执行时, 它都会检查以下条件是否全部满足, 如果是的话, 就会触发自动的 AOF 重写:
1.没有 BGSAVE 命令在进行。
2.没有 BGREWRITEAOF 在进行。
3.当前 AOF 文件大小大于 server.aof_rewrite_min_size (默认值为 1 MB)。
4.当前 AOF 文件大小和最后一次 AOF 重写后的大小之间的比率大于等于指定的增长百分比。
默认情况下, 增长百分比为 100% , 也即是说, 如果前面三个条件都已经满足, 并且当前 AOF 文件大小比最后一次 AOF 重写时的大小要大一倍的话, 那么触发自动 AOF 重写。

相关文章

网友评论

    本文标题:「Redis源码解读」—持久化(二)AOF

    本文链接:https://www.haomeiwen.com/subject/cdffhctx.html