美文网首页
Redis主从复制

Redis主从复制

作者: 一剑光寒十九洲 | 来源:发表于2020-08-19 11:39 被阅读0次

    主从同步

    主从同步流程图

    主从复制流程图

    状态常量

    /* 当前服务器作为slave的复制状态,保存在server.repl_state用于记住下一步该干什么 */
    #define REPL_STATE_NONE 0 /* 复制能力未激活 */
    #define REPL_STATE_CONNECT 1 /* 未连接到master */
    #define REPL_STATE_CONNECTING 2 /* 正在发起到master的连接,发送PING命令 */
    /* 握手状态,必须有序 */
    #define REPL_STATE_RECEIVE_PONG 3 /* 已发送PING,等待PING命令的响应-PONG */
    #define REPL_STATE_SEND_AUTH 4 /* 待发送AUTH命令 */
    #define REPL_STATE_RECEIVE_AUTH 5 /* 已发送AUTH,等待响应 */
    #define REPL_STATE_SEND_PORT 6 /* 待发送REPLCONF-监听端口信息 */
    #define REPL_STATE_RECEIVE_PORT 7 /* 已发送REPLCONF-监听端口信息,等待响应 */
    #define REPL_STATE_SEND_IP 8 /* 待发送REPLCONF-ip-address信息 */
    #define REPL_STATE_RECEIVE_IP 9 /* 已发送REPLCONF-ip-address信息,等待响应 */
    #define REPL_STATE_SEND_CAPA 10 /* 待发送REPLCONF capa能力信息*/
    #define REPL_STATE_RECEIVE_CAPA 11 /* 待发送REPLCONF capa能力信息,等待响应 */
    #define REPL_STATE_SEND_PSYNC 12 /* 待发送PSYNC命令 */
    #define REPL_STATE_RECEIVE_PSYNC 13 /* 待发送PSYNC命令,等待响应 */
    /* --- 握手完成,握手结束后的状态 --- */
    #define REPL_STATE_TRANSFER 14 /* 正在从master接受rdb文件 */
    #define REPL_STATE_CONNECTED 15 /* 已经连接上master,正在进行命令传播 */
    
    /* 从master角度看待的slave状态,将会放置在client->replstate中。
     * 当是SEND_BULK和ONLINE状态时,说明slave将会接收新的更新到它的output队列。
     * 当是WAIT_BGSAVE状态时,说明客户端正在等待RDB文件生成。 */
    #define SLAVE_STATE_WAIT_BGSAVE_START 6 /* 需要生产一个RDB文件 */
    #define SLAVE_STATE_WAIT_BGSAVE_END 7 /* 正在等待RDB文件生成结束 */
    #define SLAVE_STATE_SEND_BULK 8 /* 正在发送RDB到slave */
    #define SLAVE_STATE_ONLINE 9 /* RDB传输完成,发送后续的命令传播 */
    

    启动同步

    客户端向从服务器发送slavef {ip:port}命令

    /* REPLCONF <option> <value> <option> <value> ...
     * 这个命令用于在SYNC命令之前配置复制进程。
     * 当前这个命令用于和master通信,
     * 1) 用来告诉master端口号等相关信息,使得master的info输出中可以正确的输出相关信息。
     * 2) 也用来告诉master复制过程中的相关配置信息如CAPA来让master决策。
     * */
    void replconfCommand(client *c) {
        int j;
    
        if ((c->argc % 2) == 0) {
            /* Number of arguments must be odd to make sure that every
             * option has a corresponding value. */
            addReply(c,shared.syntaxerr);
            return;
        }
    
        /* Process every option-value pair. */
        for (j = 1; j < c->argc; j+=2) {
            if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
                long port;
    
                if ((getLongFromObjectOrReply(c,c->argv[j+1],
                        &port,NULL) != C_OK))
                    return;
                c->slave_listening_port = port;
            } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
                sds ip = c->argv[j+1]->ptr;
                if (sdslen(ip) < sizeof(c->slave_ip)) {
                    memcpy(c->slave_ip,ip,sdslen(ip)+1);
                } else {
                    addReplyErrorFormat(c,"REPLCONF ip-address provided by "
                        "replica instance is too long: %zd bytes", sdslen(ip));
                    return;
                }
            } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
                /* Ignore capabilities not understood by this master. */
                if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
                    c->slave_capa |= SLAVE_CAPA_EOF;
                else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
                    c->slave_capa |= SLAVE_CAPA_PSYNC2;
            } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
                /* REPLCONF ACK is used by slave to inform the master the amount
                 * of replication stream that it processed so far. It is an
                 * internal only command that normal clients should never use. */
                long long offset;
    
                if (!(c->flags & CLIENT_SLAVE)) return;
                if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
                    return;
                if (offset > c->repl_ack_off)
                    c->repl_ack_off = offset;
                c->repl_ack_time = server.unixtime;
                /* If this was a diskless replication, we need to really put
                 * the slave online when the first ACK is received (which
                 * confirms slave is online and ready to get more data). */
                if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
                    putSlaveOnline(c);
                /* Note: this command does not reply anything! */
                return;
            } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
                /* REPLCONF GETACK is used in order to request an ACK ASAP
                 * to the slave. */
                if (server.masterhost && server.master) replicationSendAck();
                return;
            } else {
                addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
                    (char*)c->argv[j]->ptr);
                return;
            }
        }
        addReply(c,shared.ok);
    }
    

    配置同步信息

    从服务器在serverCron事件函数中向主服务器发送replconf信息

    /* 复制核心函数,每秒执行一次,在serverCron中调用 */
    void replicationCron(void) {
        static long long replication_cron_loops = 0;
    
        /* 当前实例作为slave,正准备握手或者正在握手:[2,13],超时,则取消当前握手,重连 */
        if (server.masterhost &&
            (server.repl_state == REPL_STATE_CONNECTING ||
             slaveIsInHandshakeState()) &&
             (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
        {
            serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
            cancelReplicationHandshake();
        }
    
        /* 当前实例作为slave,正在接受RDB: [14],如果超时,取消当前握手,重连 */
        if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
            (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
        {
            serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
            cancelReplicationHandshake();
        }
    
        /* 当前实例作为slave, 正在进行正常的复制,但是出现了超时,则释放master客户端,重连 */
        if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
            (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
        {
            serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
            freeClient(server.master);
        }
    
        /* 当前实例作为slave, 需要连接一个master */
        if (server.repl_state == REPL_STATE_CONNECT) {
            serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
                server.masterhost, server.masterport);
            if (connectWithMaster() == C_OK) {
                serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
            }
        }
    
        /* 当前实例作为slave, 并且master可以理解PSYNC,发送REPLCONf ACK给master,每秒1次,作用:
         * 1) slave->master的心跳
         * 2) 同步给master当前slave的复制偏移量
         * 注意:如果master不支持PSYNC和复制偏移量。
         * */
        if (server.masterhost && server.master &&
            !(server.master->flags & CLIENT_PRE_PSYNC))
            replicationSendAck();
    
        /* 如果我们有slaves,不断地PING它们。
         * 这样slaves可以显式的实现一个到master的超时,并且能在TCP不可用时,监测到断连。 */
        listIter li;
        listNode *ln;
        robj *ping_argv[1];
    
        /* 当时实例作为master向slaves发送PING,每repl_ping_slave_period=10s一次
         * 注意:这个功能实际上是通过写入到复制积压缓冲区中实现的,而不是直接发送给slave。
         * 这意味着,在复制同步期间,所有的PING将会在slave同步期间不会真正发送给slave,
         * 而是在slave的复制状态变更为ONLINE时才发送。
         * */
        if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
            listLength(server.slaves))
        {
            ping_argv[0] = createStringObject("PING",4);
            replicationFeedSlaves(server.slaves, server.slaveseldb,
                ping_argv, 1);
            decrRefCount(ping_argv[0]);
        }
    
        /* 对于presync阶段的slaves,它们正在等待RDB文件,发送一个空行。
         * 对于presync阶段的slaves,它们正在等待RDB文件,而RDB生成的时间可能很长,我们需要让slave
         * 能够知道这段时间内到master的连接正常,这是必要的。
         * 注意:在线状态时master->slave的心跳是通过复制流中的PING实现的,而sub-slaves间的复制流是
         * 是从顶级master继承的,此时还在presync阶段,没有复制流传输,所以PING心跳无法用于presync阶段。
         * 这个presync阶段的newline的心跳机制不会影响复制流偏移量。
         * 这个newline将会被slave忽略,但是会刷新slave和master的最后交互时间来防止超时,我们每秒发送一次。
         * */
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
    
            int is_presync =
                (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
                (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
                 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
    
            if (is_presync) {
                if (write(slave->fd, "\n", 1) == -1) {
                    /* 不必关心socket errors,这只是用于RDB生成期间的心跳 */
                }
            }
        }
    
        ...
    }
    
    /* 连接上master
     * 1. 创建socket,连接上master
     * 2. 注册该socket的读写时间为syncWithMaster函数
     * 3. 修改复制相关状态
     * */
    int connectWithMaster(void) {
        int fd;
    
        fd = anetTcpNonBlockBestEffortBindConnect(NULL,
            server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
        if (fd == -1) {
            serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
                strerror(errno));
            return C_ERR;
        }
    
        if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
                AE_ERR)
        {
            close(fd);
            serverLog(LL_WARNING,"Can't create readable event for SYNC");
            return C_ERR;
        }
    
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_s = fd;
        server.repl_state = REPL_STATE_CONNECTING;
        return C_OK;
    }
    
    /* 这个函数将会在一个非阻塞的客户端连接到master后触发 */
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
        char tmpfile[256], *err = NULL;
        int dfd = -1, maxtries = 5;
        int sockerr = 0, psync_result;
        socklen_t errlen = sizeof(sockerr);
        UNUSED(el);
        UNUSED(privdata);
        UNUSED(mask);
    
        /* 当用户将服务器切换到master后,关闭socket */
        if (server.repl_state == REPL_STATE_NONE) {
            close(fd);
            return;
        }
    
        /* 一个非阻塞的连接可能会触发error,如果出现了错误,我们跳转到error */
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
            sockerr = errno;
        if (sockerr) {
            serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
                strerror(sockerr));
            goto error;
        }
    
        /* 发送一个PING命令检查master是否有能力回复 */
        if (server.repl_state == REPL_STATE_CONNECTING) {
            serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
            // 删除写事件,我们只关注PONG回复,切换状态
            aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
            server.repl_state = REPL_STATE_RECEIVE_PONG;
            // 发送PING,如果出现了错误,那么就跳转到write_error
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
            if (err) goto write_error;
            return;
        }
    
        /* Receive the PONG command. */
        if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
            err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
    
            /* +PONG 正常响应
             * --NOAUTH 需要进行验证
             * -ERR operation not permitted 老版本,需要进行验证
             * */
            if (err[0] != '+' &&
                strncmp(err,"-NOAUTH",7) != 0 &&
                strncmp(err,"-ERR operation not permitted",28) != 0)
            {
                serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
                sdsfree(err);
                goto error;
            } else {
                serverLog(LL_NOTICE,
                    "Master replied to PING, replication can continue...");
            }
            sdsfree(err);
            // 切换到待发送AUTH状态
            server.repl_state = REPL_STATE_SEND_AUTH;
        }
    
        /* 如果master需要,我们就进行验证,否则我们切换到待发送端口状态 */
        if (server.repl_state == REPL_STATE_SEND_AUTH) {
            if (server.masterauth) {
                err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
                if (err) goto write_error;
                server.repl_state = REPL_STATE_RECEIVE_AUTH;
                return;
            } else {
                server.repl_state = REPL_STATE_SEND_PORT;
            }
        }
    
        /* 接收AUTH回复,如果通过,切换到待发送端口状态 */
        if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
            err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
            if (err[0] == '-') {
                serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
                sdsfree(err);
                goto error;
            }
            sdsfree(err);
            server.repl_state = REPL_STATE_SEND_PORT;
        }
    
        /* 如果配置了server.slave_announce_port,则使用slave_announce_port,否则使用server.port  */
        if (server.repl_state == REPL_STATE_SEND_PORT) {
            sds port = sdsfromlonglong(server.slave_announce_port ?
                server.slave_announce_port : server.port);
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                    "listening-port",port, NULL);
            sdsfree(port);
            if (err) goto write_error;
            sdsfree(err);
            server.repl_state = REPL_STATE_RECEIVE_PORT;
            return;
        }
    
        /* 接收 REPLCONF listeniing-port [port] 的响应 */
        if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
            err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
            /*忽略,不是所有redis版本都支持  REPLCONF listening-port. */
            if (err[0] == '-') {
                serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                    "REPLCONF listening-port: %s", err);
            }
            sdsfree(err);
            server.repl_state = REPL_STATE_SEND_IP;
        }
    
        /* 如果没有配置slave-announce-ip,则跳过,master可以直接从socket中知道ip */
        if (server.repl_state == REPL_STATE_SEND_IP &&
            server.slave_announce_ip == NULL)
        {
                server.repl_state = REPL_STATE_SEND_CAPA;
        }
    
        /* 发送 REPLCONF ip-address [ip] */
        /* 设置slave的ip,这样对于端口转发和NAT的场景,master的INFO命令将能够列出正确的slave的ip */
        if (server.repl_state == REPL_STATE_SEND_IP) {
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                    "ip-address",server.slave_announce_ip, NULL);
            if (err) goto write_error;
            sdsfree(err);
            server.repl_state = REPL_STATE_RECEIVE_IP;
            return;
        }
    
        /* 接收 REPLCONF ip-address [ip] 的响应 */
        if (server.repl_state == REPL_STATE_RECEIVE_IP) {
            err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
            /* 忽略,不是所有redis版本都支持 REPLCONF ip-address */
            if (err[0] == '-') {
                serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                    "REPLCONF ip-address: %s", err);
            }
            sdsfree(err);
            server.repl_state = REPL_STATE_SEND_CAPA;
        }
    
        /* 宣告我们的slave支持的能力
         * EOF:支持EOF格式的RDB无盘复制
         * PSYNC2:支持PSYNC v2,所以能够理解 +CONTINUE <new repl ID>
         *
         * master将会忽略它无法理解的能力
         * */
        if (server.repl_state == REPL_STATE_SEND_CAPA) {
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                    "capa","eof","capa","psync2",NULL);
            if (err) goto write_error;
            sdsfree(err);
            server.repl_state = REPL_STATE_RECEIVE_CAPA;
            return;
        }
    
        /* 接收CAPA回复 */
        if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
            err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
            /* 忽略,不是所有redis版本都支持 REPLCONF capa. */
            if (err[0] == '-') {
                serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                      "REPLCONF capa: %s", err);
            }
            sdsfree(err);
            server.repl_state = REPL_STATE_SEND_PSYNC;
        }
    
        /* 尝试执行一个部分重同步。
         * 如果没有cached master,我们将会使用PSYNC来开启一个完整重同步,这样我们可以
         * 获得master runid和全局偏移量,我们将会在下次重连时开启一个部分重同步。 */
        if (server.repl_state == REPL_STATE_SEND_PSYNC) {
            if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
                err = sdsnew("Write error sending the PSYNC command.");
                goto write_error;
            }
            server.repl_state = REPL_STATE_RECEIVE_PSYNC;
            return;
        }
    
        /* 如果我们到达了这里,我们应该处于REPL_STATE_RECEIVE_PSYNC状态 */
        if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
            serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
                                 "state should be RECEIVE_PSYNC but is %d",
                                 server.repl_state);
            goto error;
        }
    
        /* 获取PSYNC的响应 */
        psync_result = slaveTryPartialResynchronization(fd,1);
        if (psync_result == PSYNC_WAIT_REPLY) return; /* 暂时没有数据,继续获取 */
    
        /* master暂时无法进行复制,我们之后需要从零开始复制,所以我们转到err。
         * 当master正在loading或者master未连接到它的master等情况时发生。 */
        if (psync_result == PSYNC_TRY_LATER) goto error;
    
        /* 注意:如果PSYNC没有返回WAIT_REPLY,它会自己处理卸载可读事件处理器 */
    
        if (psync_result == PSYNC_CONTINUE) {
            serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
            return;
        }
    
        /* PSYNC失败或者不支持:如果我们有sub-slaves,我们希望它们和我们重新进行同步。
         * 因为master可能会传输给我们一个全新的数据集,我们不能增量传播给slaves。 */
        disconnectSlaves(); /* 强制我们的slaves重新同步我们 */
        freeReplicationBacklog(); /* 不允许我们的slaves进行PSYNC */
    
        /* 不支持PSYNC命令,我们将会在后面用SYNC重试 */
        if (psync_result == PSYNC_NOT_SUPPORTED) {
            serverLog(LL_NOTICE,"Retrying with SYNC...");
            if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
                serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
                    strerror(errno));
                goto error;
            }
        }
    
        /* 准备一个合适的临时文件来接收RDB数据 */
        while(maxtries--) {
            snprintf(tmpfile,256,
                "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
            if (dfd != -1) break;
            sleep(1);
        }
        if (dfd == -1) {
            serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
            goto error;
        }
    
        /* 设置非阻塞下载RDB文件的可读事件处理器 */
        if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
                == AE_ERR)
        {
            serverLog(LL_WARNING,
                "Can't create readable event for SYNC: %s (fd=%d)",
                strerror(errno),fd);
            goto error;
        }
    
        // 初始化相关状态
        server.repl_state = REPL_STATE_TRANSFER;
        server.repl_transfer_size = -1;
        server.repl_transfer_read = 0;
        server.repl_transfer_last_fsync_off = 0;
        server.repl_transfer_fd = dfd;
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_tmpfile = zstrdup(tmpfile);
        return;
    
    error:      // 如果出现了任何错误,我们将会重置关闭socket,删除ae注册时间,并重置复制状态以重连
        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        if (dfd != -1) close(dfd);
        close(fd);
        server.repl_transfer_s = -1;
        server.repl_state = REPL_STATE_CONNECT;
        return;
    
    write_error:     // 处理同步发送命令的错误
        serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
        sdsfree(err);
        goto error;
    }
    

    主服务器处理从服务器发来的replconf命令

    /* REPLCONF <option> <value> <option> <value> ...
     * 这个命令用于在SYNC命令之前配置复制进程。
     * 当前这个命令用于和master通信,
     * 1) 用来告诉master端口号等相关信息,使得master的info输出中可以正确的输出相关信息。
     * 2) 也用来告诉master复制过程中的相关配置信息如CAPA来让master决策。
     * */
    void replconfCommand(client *c) {
        int j;
    
        if ((c->argc % 2) == 0) {
            /* Number of arguments must be odd to make sure that every
             * option has a corresponding value. */
            addReply(c,shared.syntaxerr);
            return;
        }
    
        /* Process every option-value pair. */
        for (j = 1; j < c->argc; j+=2) {
            if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
                long port;
    
                if ((getLongFromObjectOrReply(c,c->argv[j+1],
                        &port,NULL) != C_OK))
                    return;
                c->slave_listening_port = port;
            } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
                sds ip = c->argv[j+1]->ptr;
                if (sdslen(ip) < sizeof(c->slave_ip)) {
                    memcpy(c->slave_ip,ip,sdslen(ip)+1);
                } else {
                    addReplyErrorFormat(c,"REPLCONF ip-address provided by "
                        "replica instance is too long: %zd bytes", sdslen(ip));
                    return;
                }
            } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
                /* Ignore capabilities not understood by this master. */
                if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
                    c->slave_capa |= SLAVE_CAPA_EOF;
                else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
                    c->slave_capa |= SLAVE_CAPA_PSYNC2;
            } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
                /* REPLCONF ACK is used by slave to inform the master the amount
                 * of replication stream that it processed so far. It is an
                 * internal only command that normal clients should never use. */
                long long offset;
    
                if (!(c->flags & CLIENT_SLAVE)) return;
                if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
                    return;
                if (offset > c->repl_ack_off)
                    c->repl_ack_off = offset;
                c->repl_ack_time = server.unixtime;
                /* If this was a diskless replication, we need to really put
                 * the slave online when the first ACK is received (which
                 * confirms slave is online and ready to get more data). */
                if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
                    putSlaveOnline(c);
                /* Note: this command does not reply anything! */
                return;
            } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
                /* REPLCONF GETACK is used in order to request an ACK ASAP
                 * to the slave. */
                if (server.masterhost && server.master) replicationSendAck();
                return;
            } else {
                addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
                    (char*)c->argv[j]->ptr);
                return;
            }
        }
        addReply(c,shared.ok);
    }
    

    同步策略协调

    从服务器询问同步策略

    /* 如果我们正在重连,尝试一个部分重同步。
     * 如果没有cached master,我们将会发送'PSYNC ? -1'来触发一个全同步,来获取master的
     * runid和复制全局offset。
     *
     * 这个函数被syncWithMaster调用,所以下面的假设成立:
     * 1) 入参fd已经连接上。
     * 2) 这个函数不会关闭fd,然而,当部分重同步成功时,server.master的client结构体将会重用fd。
     *
     * 这个函数有两种行为,通过read_reply控制:
     * 如果read_reply为0,我们将会发送PSYNC命令到master,并且调用方需要在之后使用read_reply=1,
     * 再次调用该函数来读取master的回复。这是为了支持非阻塞的操作,我们在两次事件循环中分别进行读写
     * 来发送命令和获取响应。
     *
     * 当read_reply=0时,即发送命令时,如果发送出错,将会返回PSYNC_WRITE_ERR,否则将会返回
     * PSYNC_WAIT_REPLY,并且调用方需要使用read_reply=1再次调用该函数来读取master的回复。
     * 当read_reply=1时,当还没有有效的数据时,将再次返回PSYNC_WAIT_REPLY,
     *
     * 函数返回值:
     * PSYNC_CONTINUE: 可以进行部分重同步。
     * PSYNC_FULLRESYNC:master支持PSYNC命令,但是需要进行完整重同步。master runid和offset将会被保存。
     * PSYNC_NOT_SUPPORTED: master不理解PSYNC命令,调用方需要使用SYNC命令再次调用。
     * PSYNC_WRITE_ERROR:写命令时出现了错误。
     * PSYNC_WAIT_REPLY:暂时没有有效的数据,需要后续使用read_reply=1再次调用。
     * PSYNC_TRY_LATER: master暂时无法处理,比如正在load数据或者master没联系上自己的master,等等。
     *
     * 副作用:
     * 1) 这个将会移除读事件处理器,除非返回了PSYNC_WAIT_REPLY。
     * 2) server.master_initial_offset将会根据master的reply正确的设置,这个值将会被用来
     *    填充server.master结构体的复制偏移量。
     * */
    #define PSYNC_WRITE_ERROR 0
    #define PSYNC_WAIT_REPLY 1
    #define PSYNC_CONTINUE 2
    #define PSYNC_FULLRESYNC 3
    #define PSYNC_NOT_SUPPORTED 4
    #define PSYNC_TRY_LATER 5
    int slaveTryPartialResynchronization(int fd, int read_reply) {
        char *psync_replid;
        char psync_offset[32];
        sds reply;
    
        if (!read_reply) {      // read_reply=0,向master发送PSYNC
            /* 初始化master_initial_offset=-1来标记不可用。
             * 之后我们如果要做完整重同步我们将会设置其为正确的值,然后这个值将会传递到server.master。*/
            server.master_initial_offset = -1;
    
            if (server.cached_master) {     // 如果我们有一个缓存的master信息,我们将会使用该信息做部分重同步
                psync_replid = server.cached_master->replid;
                // 注意:PSYNC发送的偏移量是reploff+1,表示slave需要的首个数据
                snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
                serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
            } else {        // 如果没有,我们只能进行一个完整重同步
                serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
                psync_replid = "?";
                memcpy(psync_offset,"-1",3);
            }
    
            /* 发送PSYNC命令 */
            reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
            if (reply != NULL) {
                serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
                sdsfree(reply);
                // 发送PSYNC出错,删除注册的读事件
                aeDeleteFileEvent(server.el,fd,AE_READABLE);
                return PSYNC_WRITE_ERROR;
            }
            return PSYNC_WAIT_REPLY;
        }
    
        // read_reply=1, 之后后面的逻辑,读取PSYNC的响应
        /* 从响应中读取一行命令回复 */
        reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        if (sdslen(reply) == 0) {
            /* 在它接受到PSYNC回复前后,master可能会返回一些空白行用于心跳,我们保持连接即可 */
            sdsfree(reply);
            return PSYNC_WAIT_REPLY;
        }
    
        aeDeleteFileEvent(server.el,fd,AE_READABLE);
    
        // +FULLRESYNC bc1621104063d4f46cff756b644c290e80362d3c 238
        if (!strncmp(reply,"+FULLRESYNC",11)) {
            char *replid = NULL, *offset = NULL;
    
            /* 如果master要求我们做完整重同步,解析reply来提取runid和复制偏移量 */
            replid = strchr(reply,' ');
            if (replid) {
                replid++;
                offset = strchr(replid,' ');
                if (offset) offset++;
            }
            if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
                serverLog(LL_WARNING,
                    "Master replied with wrong +FULLRESYNC syntax.");
                /* 这是一种异常情况,master返回了+FULLRESYNC,说明master支持PSYNC,
                 * 但是返回的格式看起来有问题。
                 * 为了保证安全,我们把replid置为空, 防止等会重连master时使用一个错误的replid */
                memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
            } else {
                memcpy(server.master_replid, replid, offset-replid-1);
                server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
                server.master_initial_offset = strtoll(offset,NULL,10);
                serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
                    server.master_replid,
                    server.master_initial_offset);
            }
            /* 我们要执行完整重同步了,抛弃cached master结构体。 */
            replicationDiscardCachedMaster();
            sdsfree(reply);
            return PSYNC_FULLRESYNC;
        }
    
        // +CONTINUE <replid> <offset>
        if (!strncmp(reply,"+CONTINUE",9)) {
            /* 部分重同步被接受 */
            serverLog(LL_NOTICE,
                "Successful partial resynchronization with master.");
    
            /* 检查master宣告的复制偏移量。
             * 如果它改变,我们需要把新的id作为id,并把之前的id作为次id2,并更新second_replid_offset,
             * 这样我们的子slaves可以在断开后使用PSYNC重连我们。 */
            char *start = reply+10;
            char *end = reply+9;
            while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
            if (end-start == CONFIG_RUN_ID_SIZE) {
                char new[CONFIG_RUN_ID_SIZE+1];
                memcpy(new,start,CONFIG_RUN_ID_SIZE);
                new[CONFIG_RUN_ID_SIZE] = '\0';
    
                if (strcmp(new,server.cached_master->replid)) { // masterID改变
                    serverLog(LL_WARNING,"Master replication ID changed to %s",new);
    
                    /* 设置oldID作为我们的id2,并更新second_replid_offset */
                    memcpy(server.replid2,server.cached_master->replid,
                        sizeof(server.replid2));
                    server.second_replid_offset = server.master_repl_offset+1;
    
                    /* 更新ceched->masterID和id */
                    memcpy(server.replid,new,sizeof(server.replid));
                    memcpy(server.cached_master->replid,new,sizeof(server.replid));
    
                    /* 断开所有的slave,之后它们将会重连,并使用我们新的id */
                    disconnectSlaves();
                }
            }
    
            /* 设置继续复制 */
            sdsfree(reply);
            replicationResurrectCachedMaster(fd);
    
            /* 如果当前的实例是我们重启的,并且PSYNC的元数据是从持久化文件中读取的,
             * 我们的复制缓冲区应该还没有初始化,创建它。 */
            if (server.repl_backlog == NULL) createReplicationBacklog();
            return PSYNC_CONTINUE;
        }
    
        /* 如果我们达到了这里说明我们遇到了某些错误。
         * 当错误是临时错误时,我们返回PSYNC_TRY_LATER
         * 当master不支持PSYNC或者我们不清楚的错误时,我们返回PSYNC_NOT_SUPPORTED */
    
        if (!strncmp(reply,"-NOMASTERLINK",13) ||
            !strncmp(reply,"-LOADING",8))
        {
            serverLog(LL_NOTICE,
                "Master is currently unable to PSYNC "
                "but should be in the future: %s", reply);
            sdsfree(reply);
            return PSYNC_TRY_LATER;
        }
    
        if (strncmp(reply,"-ERR",4)) {
            /* If it's not an error, log the unexpected event. */
            serverLog(LL_WARNING,
                "Unexpected reply to PSYNC from master: %s", reply);
        } else {
            serverLog(LL_NOTICE,
                "Master does not support PSYNC or is in "
                "error state (reply: %s)", reply);
        }
        sdsfree(reply);
        replicationDiscardCachedMaster();
        return PSYNC_NOT_SUPPORTED;
    }
    

    主服务器响应同步策略

    /* SYNC和PSYNC命令的实现 */
    void syncCommand(client *c) {
        /* 如果客户端已经是一个slave或者monitor,则直接返回 */
        if (c->flags & CLIENT_SLAVE) return;
    
        /* 如果我们是一个slave但是还没有跟随成功我们的master,则拒绝 */
        if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
            addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
            return;
        }
    
        /* 当执行同步时,client的回复缓冲区必须是空的。我们在执行BGSAVE时,可能复用RDB文件,
         * 会在两个slave之间拷贝回复缓冲区 */
        if (clientHasPendingReplies(c)) {
            addReplyError(c,"SYNC and PSYNC are invalid with pending output");
            return;
        }
    
        serverLog(LL_NOTICE,"Replica %s asks for synchronization",
            replicationGetSlaveName(c));
    
        /* 如果是PSYNC命令,尝试进行部分重同步。
         * 如果失败了,我们将会进行一个完整重同步,通过返回:
         * +FULLRESYNC <replid> <offset>
         * 这样slave可以在断开到master的连接时,知道新的replid和偏移量,来进行一个部分重同步的重连。
         * */
        if (!strcasecmp(c->argv[0]->ptr,"psync")) {
            if (masterTryPartialResynchronization(c) == C_OK) {
                server.stat_sync_partial_ok++;
                return; /* 不需要完整重同步,直接返回 */
            } else {
                char *master_replid = c->argv[1]->ptr;
    
                /* 仅当slave被迫执行部分重同步出错时,才更新stat_sync_partial_err */
                if (master_replid[0] != '?') server.stat_sync_partial_err++;
            }
        } else {
            /* 如果是SYNC命令,我们将会使用复制协议的一个老的实现。标记客户端,我们不希望接受
             * REPLCONF ACK的反馈信息。 */
            c->flags |= CLIENT_PRE_PSYNC;
        }
    
        /* 下面进行完整重同步 */
        server.stat_sync_full++;
    
        /* 设置slave复制状态为SLAVE_STATE_WAIT_BGSAVE_START。下面的代码路径将会根据
         * 我们不同的处理来修改状态。 */
        c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
        if (server.repl_disable_tcp_nodelay)    // 如果禁用TCP NoDelay选项
            anetDisableTcpNoDelay(NULL, c->fd); /* 失败了也不要紧 */
        c->repldbfd = -1;
        c->flags |= CLIENT_SLAVE;
        listAddNodeTail(server.slaves,c);
    
        /* 如果需要,我们创建repl_backlog缓冲区 */
        if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
            /* 当我们创建backlog时,我们总是使用新的replid并且清理id2,
             * 这样就不会有非法的历史数据了 */
            changeReplicationId();
            clearReplicationId2();
            createReplicationBacklog();
        }
    
        /* 第一种情况:BGSAVE正在后台进行,并且target=disk */
        if (server.rdb_child_pid != -1 &&
            server.rdb_child_type == RDB_CHILD_TYPE_DISK)
        {
            /* 现在后台有一个BGSAVE在运行。我们检查是否可以用于复制。
             * 如果有另外一个slave触发了BGSAVE,我们可以尝试复用RDB文件 */
            client *slave;
            listNode *ln;
            listIter li;
    
            listRewind(server.slaves,&li);
            while((ln = listNext(&li))) {
                slave = ln->value;
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
            }
            /* 如果我们的slave的能力兼容这个slave的能力,则可以复用同一个RDB文件 */
            if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
                /* 完成,我们和另外一个slave兼容,设置正确的状态,并copy缓冲区 */
                copyClientOutputBuffer(c,slave);
                replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
                serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
            } else {
                /* 不兼容,我们需要等待下一个BGSAVE */
                serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
            }
    
        /* 第二种情况:BGSAVE在后台运行,但是target=socket */
        } else if (server.rdb_child_pid != -1 &&
                   server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
        {
            /* 又一个RDB进程正直写入socket。我们需要等待下一次BGSAVE来执行同步。 */
            serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
    
        /* 没有BGSAVE在后台运行 */
        } else {
            if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
                /* diskless复制RDB进程将会在replicationCron()函数中创建,
                 * 因为我们想延迟它一段时间来等待更多的slaves。 */
                if (server.repl_diskless_sync_delay)
                    serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
            } else {
                /* 如果target=disk,或者slave无法支持diskless复制,并且我们还没有RDB进程,开始一个。 */
                if (server.aof_child_pid == -1) {       // 如果没有aof重写进程,则开启
                    startBgsaveForReplication(c->slave_capa);
                } else {                // 如果有aof重写进程,则延迟复制
                    serverLog(LL_NOTICE,
                        "No BGSAVE in progress, but an AOF rewrite is active. "
                        "BGSAVE for replication delayed");
                }
            }
        }
        return;
    }
    
    /* 这个函数从master的视角处理PSYNC请求。
     * 可以进行部分重同步返回C_OK,需要进行完整重同步返回C_ERR。*/
    int masterTryPartialResynchronization(client *c) {
        long long psync_offset, psync_len;
        char *master_replid = c->argv[1]->ptr;
        char buf[128];
        int buflen;
    
        /* 解析slave指定的复制偏移量。
         * 如果解析错误,则进行完整重同步:这一般不会发生,但是处理这种情况可以提高鲁棒性。 */
        if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
           C_OK) goto need_full_resync;
    
        /* server的replid是否和slave宣告的一致。
         * 如果replid改变了,这个master将会有一个不同的复制历史,就不能进行完整重同步。
         * 注意:那里有两个潜在的合法replid。然而id2是否合法取决于指定的偏移量。
         * 如果replid和server.replid2相同,说明master和slave曾经都作为slave同步过,
         * 后来进行了故障转移,则仅当slave没有该新的slave复制的多时,才允许部分重同步。
         * */
        if (strcasecmp(master_replid, server.replid) &&
            (strcasecmp(master_replid, server.replid2) ||
             psync_offset > server.second_replid_offset))
        {
            /* replid=?意味这slave要求进行完整重同步 */
            if (master_replid[0] != '?') {
                if (strcasecmp(master_replid, server.replid) &&
                    strcasecmp(master_replid, server.replid2))
                {
                    serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                        "Replication ID mismatch (Replica asked for '%s', my "
                        "replication IDs are '%s' and '%s')",
                        master_replid, server.replid, server.replid2);
                } else {
                    serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                        "Requested offset for second ID was %lld, but I can reply "
                        "up to %lld", psync_offset, server.second_replid_offset);
                }
            } else {
                serverLog(LL_NOTICE,"Full resync requested by replica %s",
                    replicationGetSlaveName(c));
            }
            goto need_full_resync;
        }
    
        /* 检查我们是否有slave需要的数据 */
        if (!server.repl_backlog ||
            psync_offset < server.repl_backlog_off ||
            psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
        {
            serverLog(LL_NOTICE,
                "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
            if (psync_offset > server.master_repl_offset) {
                serverLog(LL_WARNING,
                    "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
            }
            goto need_full_resync;
        }
    
        /* 如果到达了这里,说明我们可以执行一个部分重同步:
         * 1) 设置当前的client为slave。
         * 2) 使用+CONTINUE通告客户端我们可以进行部分重同步。
         * 3) 发送复制缓冲区中的数据。
         * */
        c->flags |= CLIENT_SLAVE;
        c->replstate = SLAVE_STATE_ONLINE;
        c->repl_ack_time = server.unixtime;
        c->repl_put_online_on_ack = 0;
        listAddNodeTail(server.slaves,c);
        /* 在这个阶段,我们不使用client的输出缓冲区时为了加速新的命令。
         * 但是,此时我们确定这个socket的发送缓冲区是空的,因此我们的写操作实际上不会错误。 */
        if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
            buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
        } else {
            buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
        }
        if (write(c->fd,buf,buflen) != buflen) {        // 出现了短写,断开重来
            freeClientAsync(c);
            return C_OK;
        }
        // 发送复制缓冲区中积累的数据
        psync_len = addReplyReplicationBacklog(c,psync_offset);
        serverLog(LL_NOTICE,
            "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
                replicationGetSlaveName(c),
                psync_len, psync_offset);
        /* 注意:我们不需要设置server.slaveseldb为-1来强制master发射SELECT,因为这个slave
         * 从先前到master的连接中获取了它自己的状态 */
    
        refreshGoodSlavesCount();       // 刷新存活的slave的数量
        return C_OK; /* 返回OK,表示进行部分重同步*/
    
    need_full_resync:
        /* 我们需要一个完整重同步。
         * 注意:我们不能理解回复给slave一个PSYNC。PSYNC的回复里需要包含生成RDB文件时master的offset,
         * 因此我们需要延迟回复。*/
        return C_ERR;
    }
    

    进行部分重同步:

    从服务器复活cachedMaster

    /* 复活cached master为当前的master,并使用一个新的文件描述符作为socket参数。
     * 这个函数在成功设置部分重同步时调用,我们可以继续接受master剩下的数据。
     * */
    void replicationResurrectCachedMaster(int newfd) {
        server.master = server.cached_master;
        server.cached_master = NULL;
        server.master->fd = newfd;
        server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
        server.master->authenticated = 1;
        server.master->lastinteraction = server.unixtime;
        server.repl_state = REPL_STATE_CONNECTED;
        server.repl_down_since = 0;
    
        /* 重新增加client到链表中 */
        linkClient(server.master);
        if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
                              readQueryFromClient, server.master)) {
            serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
            freeClientAsync(server.master); /* Close ASAP. */
        }
    
        /* 如果有未发送的数据在写缓冲区中,我们需要安装可写事件处理器 */
        if (clientHasPendingReplies(server.master)) {
            if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
                              sendReplyToClient, server.master)) {
                serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
                freeClientAsync(server.master); /* Close ASAP. */
            }
        }
    }
    

    主服务器推送数据缓冲区中的数据:见masterTryPartialResynchronization()

    进行完整重同步:

    主服务器准备RDB文件并传输给从服务器

    /* 为了复制开启一个BGSAVE进程,根据配置选择disk或者socket,并且确保在开始前脚本缓存被清理。
     * 入参mincapa是一个slave能力的位表示,可以通过SLAVE_CAPA_*相关宏测试。
     * 副作用:
     * 1) 如果可能开始一个RDB,则处理WAIT_START状态,否则发送一个错误,并从slaves列表中移除。
     * 2) 如果BGSAVE开始清空Lua脚本缓存
     * 成功返回OK,失败返回ERR。
     * */
    int startBgsaveForReplication(int mincapa) {
        int retval;
        int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
        listIter li;
        listNode *ln;
    
        serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
            socket_target ? "replicas sockets" : "disk");
    
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        /* 仅当rsipt不为NULL时才做rdbSave,否则slave将会错过这次复制流 */
        if (rsiptr) {
            if (socket_target)
                retval = rdbSaveToSlavesSockets(rsiptr);
            else
                retval = rdbSaveBackground(server.rdb_filename,rsiptr);
        } else {
            serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
            retval = C_ERR;
        }
    
        /* 如果我们BGSAVE失败,需要从slaves中删除等待完整重同步的slave,并提示错误,异步关闭连接 */
        if (retval == C_ERR) {
            serverLog(LL_WARNING,"BGSAVE for replication failed");
            listRewind(server.slaves,&li);
            while((ln = listNext(&li))) {
                client *slave = ln->value;
    
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                    slave->flags &= ~CLIENT_SLAVE;
                    listDelNode(server.slaves,ln);
                    addReplyError(slave,
                        "BGSAVE failed, replication can't continue");
                    slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
                }
            }
            return retval;
        }
    
        /* 如果target=socket, rdbSaveToSlavesSockets()将会设置slaves变更为全同步,
         * 对于target=disk, 我们将会现在做这些。 */
        if (!socket_target) {
            listRewind(server.slaves,&li);
            while((ln = listNext(&li))) {
                client *slave = ln->value;
    
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                        replicationSetupSlaveForFullResync(slave,
                                getPsyncInitialOffset());
                }
            }
        }
    
        /* Flush the script cache, since we need that slave differences are
         * accumulated without requiring slaves to match our cached scripts. */
        if (retval == C_OK) replicationScriptCacheFlush();
        return retval;
    }
    
    /* 在完整重同步时发送一个FULLRESYNC的回复,根据不同的清空,产生不同的副作用:
     * 1) 记住,在client中,我们把offset记录到psync_initial_offset,来保证后面slave可以
     *  附加到这个BGSAVE进程,通过获取这个offset和复制client的output。
     * 2) 设置WAIT_BGSAVE_END,这样我们可以积累增量数据。
     * 3) 强制复制流发射一个SELECT命令到新的slave来选择正确的数据库。
     * 正常情况下应该在这两种情况下调用:
     * 1) 在成功启动BGSAVE后调用
     * 2) 当已经有一个BGSAVE在执行时,另一个slave附加在这个BGSAVE上
     * */
    int replicationSetupSlaveForFullResync(client *slave, long long offset) {
        char buf[128];
        int buflen;
    
        slave->psync_initial_offset = offset;
        slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
        /* 我们将会开始记录增量数据,通过设置slaveseldb=-1来强制发射SELECT语句。 */
        server.slaveseldb = -1;
    
        /* 如果是SYNV命令,则不发送+FULLRESYNC回复 */
        if (!(slave->flags & CLIENT_PRE_PSYNC)) {
            buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
                              server.replid,offset);
            if (write(slave->fd,buf,buflen) != buflen) {
                freeClientAsync(slave);
                return C_ERR;
            }
        }
        return C_OK;
    }
    
    /* 这个函数在每次BGSAVE操作的末尾被调用,或者RDB传输策略切换时。
     * 这个函数的目标是当slaves等待BGSAVE操作完成时,来进行非阻塞的同步操作,
     * 并且如果slaves需要的话,调度一个新的BGSAVE操作。
     * bgsaveerr如果是OK,表示保存成功,ERR表示失败。
     * type则表示BGSAVE的target类型。
     * */
    void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
        listNode *ln;
        int startbgsave = 0;
        int mincapa = -1;
        listIter li;
    
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
    
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                // 计算当前需要进行的BGSAVE的slave的capa的最小交集
                startbgsave = 1;
                mincapa = (mincapa == -1) ? slave->slave_capa :
                                            (mincapa & slave->slave_capa);
            } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
                struct redis_stat buf;
    
                /* 如果是一个target=disk的BGSAVE,我们需要将RDB从磁盘发送给slave socket。
                 * 否则如果是一个diskless,用于无盘复制的,我们的工作量则很少,只需要设置slave在线。 */
                if (type == RDB_CHILD_TYPE_SOCKET) {
                    serverLog(LL_NOTICE,
                        "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
                            replicationGetSlaveName(slave));
                    /* 注意:我们等待从slave过来的REPLCONF ACK消息才真正使slave在线
                     * (安装可写事件从而积累的数据可以传输)。然而我们尽快改变复制状态,
                     * 因为我们的salve现在从技术上来说已经是在线了。 */
                    slave->replstate = SLAVE_STATE_ONLINE;
                    slave->repl_put_online_on_ack = 1;
                    slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
                } else {
                    if (bgsaveerr != C_OK) {
                        freeClient(slave);
                        serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
                        continue;
                    }
                    if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                        redis_fstat(slave->repldbfd,&buf) == -1) {
                        freeClient(slave);
                        serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                        continue;
                    }
                    // 初始化相关状态
                    slave->repldboff = 0;
                    slave->repldbsize = buf.st_size;
                    slave->replstate = SLAVE_STATE_SEND_BULK;
                    slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
                        (unsigned long long) slave->repldbsize);
    
                    // 设置可写事件为sendBulkToSlave
                    aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
                    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                        freeClient(slave);
                        continue;
                    }
                }
            }
        }
        // 开启一个新的BGSAVE
        if (startbgsave) startBgsaveForReplication(mincapa);
    }
    
    void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
        client *slave = privdata;
        UNUSED(el);
        UNUSED(mask);
        char buf[PROTO_IOBUF_LEN];
        ssize_t nwritten, buflen;
    
        /* 在发送RDB之前,我们需要先发送复制进程配置的序言。现在序言仅仅只是RDB文件的长度,
         * 形式是"$<length>\r\n" */
        if (slave->replpreamble) {
            nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
            if (nwritten == -1) {
                serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
                    strerror(errno));
                freeClient(slave);
                return;
            }
            server.stat_net_output_bytes += nwritten;
            sdsrange(slave->replpreamble,nwritten,-1);
            if (sdslen(slave->replpreamble) == 0) {
                sdsfree(slave->replpreamble);
                slave->replpreamble = NULL;
                /* 接下来发送数据 */
            } else {
                return;
            }
        }
    
        /* 如果序言发送完成,我们接下来传输RDB文件 */
        lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
        buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
        if (buflen <= 0) {
            serverLog(LL_WARNING,"Read error sending DB to replica: %s",
                (buflen == 0) ? "premature EOF" : strerror(errno));
            freeClient(slave);
            return;
        }
        if ((nwritten = write(fd,buf,buflen)) == -1) {
            if (errno != EAGAIN) {
                serverLog(LL_WARNING,"Write error sending DB to replica: %s",
                    strerror(errno));
                freeClient(slave);
            }
            return;
        }
        slave->repldboff += nwritten;
        server.stat_net_output_bytes += nwritten;
        // 如果发送RDB文件已经传输完成了,则删除可写事件处理器,并设置slave为在线状态
        if (slave->repldboff == slave->repldbsize) {
            close(slave->repldbfd);
            slave->repldbfd = -1;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            putSlaveOnline(slave);
        }
    }
    
    /* 这个函数将会把一个slave变更为state状态,调用的时机应该仅在:
     * 一个slave收到了RDB文件后调用,并且已经准备好发送增量数据。
     * 它会做以下事情:
     * 1) 设置为ONLINE状态(当state=ONLINE但是repl_put_online_on_ack=1时无用)
     * 2) 确保可写事件是重新安装的,从而我们可以把output缓冲区发送给slave。
     * 3) 更新good slaves数量。
     * */
    void putSlaveOnline(client *slave) {
        slave->replstate = SLAVE_STATE_ONLINE;
        slave->repl_put_online_on_ack = 0;
        slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
        if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
            sendReplyToClient, slave) == AE_ERR) {
            serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
            freeClient(slave);
            return;
        }
        refreshGoodSlavesCount();
        serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
            replicationGetSlaveName(slave));
    }
    

    从服务器设置可读事件处理器:readSyncBulkPayload

    /* 异步读取从master传递来的RDB文件 */
    #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
    void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
        char buf[4096];
        ssize_t nread, readlen, nwritten;
        off_t left;
        UNUSED(el);
        UNUSED(privdata);
        UNUSED(mask);
    
        /* 静态变量用来保持EOF标记和最后接受的字节:当它们匹配时说明到达了文件传输的末尾 */
        static char eofmark[CONFIG_RUN_ID_SIZE];
        static char lastbytes[CONFIG_RUN_ID_SIZE];
        static int usemark = 0;
    
        /* 如果repl_transfer_size == -1说明我们需要读取RDB文件长度 */
        if (server.repl_transfer_size == -1) {
            if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
                serverLog(LL_WARNING,
                    "I/O error reading bulk count from MASTER: %s",
                    strerror(errno));
                goto error;
            }
    
            if (buf[0] == '-') {
                serverLog(LL_WARNING,
                    "MASTER aborted replication with an error: %s",
                    buf+1);
                goto error;
            } else if (buf[0] == '\0') {
                /* 这个空行仅仅是保活心跳,我们更新最后交互时间就好 */
                server.repl_transfer_lastio = server.unixtime;
                return;
            } else if (buf[0] != '$') {
                serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
                goto error;
            }
    
            /* 有两种传输bulk数据的格式,1用于RDB文件传输,2用于diskless传输
             * 1) $381
             * 2) $EOF:<40 bytes delimiter>
             * diskless传输中无法直接获得文件长度,只能基于分隔符标记传输。这个分隔符足够长且随机,
             * 以至于和数据内容产生冲突的可能性很小,可以忽略。
             * */
            if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {  // rdb传输
                usemark = 1;
                memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
                memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
                /* 设置repl_transfer_size=0,防止重入该条件*/
                server.repl_transfer_size = 0;
                serverLog(LL_NOTICE,
                    "MASTER <-> REPLICA sync: receiving streamed RDB from master");
            } else {        // diskless传输
                usemark = 0;
                server.repl_transfer_size = strtol(buf+1,NULL,10);
                serverLog(LL_NOTICE,
                    "MASTER <-> REPLICA sync: receiving %lld bytes from master",
                    (long long) server.repl_transfer_size);
            }
            return;
        }
    
        /* 计算要读取的长度 */
        if (usemark) {
            readlen = sizeof(buf);
        } else {
            left = server.repl_transfer_size - server.repl_transfer_read;
            readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
        }
    
        nread = read(fd,buf,readlen);
        if (nread <= 0) {
            serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
                (nread == -1) ? strerror(errno) : "connection lost");
            cancelReplicationHandshake();
            return;
        }
        server.stat_net_input_bytes += nread;
    
        /* 在diskless传输中,我们需要检查EOF标记,防止把它写到文件中 */
        int eof_reached = 0;
    
        if (usemark) {
            /* 更新lastbytes,并检查是否匹配EOF标记 */
            if (nread >= CONFIG_RUN_ID_SIZE) {
                memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
            } else {
                int rem = CONFIG_RUN_ID_SIZE-nread;
                memmove(lastbytes,lastbytes+nread,rem);
                memcpy(lastbytes+rem,buf,nread);
            }
            if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
        }
    
        server.repl_transfer_lastio = server.unixtime;
        if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
            serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", 
                (nwritten == -1) ? strerror(errno) : "short write");
            goto error;
        }
        server.repl_transfer_read += nread;
    
        /* 如果达到了eof,我们需要删除后面的40个字节 */
        if (usemark && eof_reached) {
            if (ftruncate(server.repl_transfer_fd,
                server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
            {
                serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
                goto error;
            }
        }
    
        /* 立即把数据sync到磁盘上,如果先保存在内存中最后在落盘,我们会消耗很多内存 */
        if (server.repl_transfer_read >=
            server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
        {
            off_t sync_size = server.repl_transfer_read -
                              server.repl_transfer_last_fsync_off;
            rdb_fsync_range(server.repl_transfer_fd,
                server.repl_transfer_last_fsync_off, sync_size);
            server.repl_transfer_last_fsync_off += sync_size;
        }
    
        /* 检查传输是否完成了 */
        if (!usemark) {
            if (server.repl_transfer_read == server.repl_transfer_size)
                eof_reached = 1;
        }
    
        if (eof_reached) {
            int aof_is_enabled = server.aof_state != AOF_OFF;
    
            if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
                serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
                cancelReplicationHandshake();
                return;
            }
            serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
            /* 如果开启了AOF,我们需要暂时停止,否则AOF和RDB一起进行,我们会造成写时复制的灾难行为 */
            if(aof_is_enabled) stopAppendOnly();
            signalFlushedDb(-1);
            // 清空数据库
            emptyDb(
                -1,
                server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
                replicationEmptyDbCallback);
            /* 在把DB异步加载到内存中之前,我们需要删除可读事件处理器,否则当新的数据到达时,
             * 这个事件会被事件循环一直调用 */
            aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
            serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
            rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
            /* 重新加载RDB文件 */
            if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
                serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
                cancelReplicationHandshake();
                /* 如果我们刚刚关闭了AOF,重新开启 */
                if (aof_is_enabled) restartAOF();
                return;
            }
            /* 最后我们设置主从同步状态为REPL_STATE_CONNECTED,
             * 创建一个新的客户端,在创建时会注册可读事件为readQueryFromClient */
            zfree(server.repl_transfer_tmpfile);
            close(server.repl_transfer_fd);
            replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
            server.repl_state = REPL_STATE_CONNECTED;
            server.repl_down_since = 0;
            /* 在完整重同步后,我们使用master的replid和偏移量,id2和偏移量将会被清空,
             * 因为我们开始了一个新的复制流。 */
            memcpy(server.replid,server.master->replid,sizeof(server.replid));
            server.master_repl_offset = server.master->reploff;
            clearReplicationId2();
            /* 如果需要我们创建复制缓冲区。
             * 不管slaves是否有sub-slaves,都需要有复制缓冲区来支持正确的故障转移  */
            if (server.repl_backlog == NULL) createReplicationBacklog();
    
            serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
            /* 我们已经完成了同步,重启AOF。这将会触发一次AOF重写,当结束时将会开始追加一个新的AOF文件 */
            if (aof_is_enabled) restartAOF();
        }
        return;
    
    error:
        cancelReplicationHandshake();
        return;
    }
    

    命令传播

    主从传播:复制流将会在命令执行完毕后生成,并传输给slaves

    /* Call()是Redis命令执行的核心
     *
     * The following flags can be passed:
     * CMD_CALL_NONE        No flags.
     * CMD_CALL_SLOWLOG     检查该命令执行时间,条件达到时写入慢日志
     * CMD_CALL_STATS       Populate command stats.
     * CMD_CALL_PROPAGATE_AOF   如果该命令修改了数据库或者客户端强制,则追加到AOF日志中
     * CMD_CALL_PROPAGATE_REPL  如果命令会修改数据库或者客户端设置了FORCE_PROGATION标志则发送到slaves
     * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
     * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
     *
     * 精确的传播行为还取决于客户端标志:
     * 1。如果客户端带有CLIENT_FORCE_AOF或者CLIENT_FORCE_REPL,
     * 并且调用该函数中时带有CMD_CALL_PROPAGATE_AOF/REPL标志,
     * 即使没有修改数据库,也会进行传播到AOF和slaves。
     * 2。如果客户端带有CLIENT_PREVENT_REPL_PROP或者CLIENT_PREVENT_AOF_PROP,
     * 即使数据库被修改了,也不会传播到AOF和slaves中。
     * 注意:不管客户端的标志是什么,
     * 如果调用时CMD_CALL_PROPAGATE_AOF或者CMD_CALL_PROPAGATE_REPL,
     * AOF和子slaves的传播行为都不会发生。
     *
     * Client flags are modified by the implementation of a given command
     * using the following API:
     * Client的标志可以被以下命令API的实现进行修改:
     * forceCommandPropagation(client *c, int flags);
     * preventCommandPropagation(client *c);
     * preventCommandAOF(client *c);
     * preventCommandReplication(client *c);
     */
    void call(client *c, int flags) {
        long long dirty, start, duration;
        int client_old_flags = c->flags;
        struct redisCommand *real_cmd = c->cmd;
    
        /* 当有客户端在监视该服务器时,向监视客户端发送当前处理的命令请求的相关信息
         * 在下列情况下不发送:
         * 1. 服务器在加载数据库
         * 2. 当前命令标志位为CMD_SKIP_MONITOR或CMD_ADMIN管理命令
         * */
        if (listLength(server.monitors) &&
            !server.loading &&
            !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
        {
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
        }
    
        /* 初始化:清空相关的标志,这些标志将会被命令按照要求进行设置,并初始化also_propagate数组 */
        c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
        redisOpArray prev_also_propagate = server.also_propagate;
        redisOpArrayInit(&server.also_propagate);
    
        /* 执行命令并计算出处理时长 */
        dirty = server.dirty;
        start = ustime();
        c->cmd->proc(c);
        duration = ustime()-start;
        dirty = server.dirty-dirty;
        if (dirty < 0) dirty = 0;
    
        /* When EVAL is called loading the AOF we don't want commands called
         * from Lua to go into the slowlog or to populate statistics. */
        if (server.loading && c->flags & CLIENT_LUA)
            flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
    
        /* If the caller is Lua, we want to force the EVAL caller to propagate
         * the script if the command flag or client flag are forcing the
         * propagation. */
        if (c->flags & CLIENT_LUA && server.lua_caller) {
            if (c->flags & CLIENT_FORCE_REPL)
                server.lua_caller->flags |= CLIENT_FORCE_REPL;
            if (c->flags & CLIENT_FORCE_AOF)
                server.lua_caller->flags |= CLIENT_FORCE_AOF;
        }
    
        /* 必要时记录慢日志,并且填充每个命令的统计信息在INFO命令状态中 */
        /* 如果设置CMD_CALL_SLOWLOG位并且该命令不是EXEC命令,则需要记录慢日志 */
        if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
            char *latency_event = (c->cmd->flags & CMD_FAST) ?
                                  "fast-command" : "command";
            latencyAddSampleIfNeeded(latency_event,duration/1000);
            slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
        }
        if (flags & CMD_CALL_STATS) {
            /* use the real command that was executed (cmd and lastamc) may be
             * different, in case of MULTI-EXEC or re-written commands such as
             * EXPIRE, GEOADD, etc. */
            real_cmd->microseconds += duration;
            real_cmd->calls++;
        }
    
        /* 传播命令到AOF文件和从服务器 */
        if (flags & CMD_CALL_PROPAGATE &&
            (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
        {
            int propagate_flags = PROPAGATE_NONE;
    
            /* 检查该命令是否修改了数据集,如果是,则进行传播 */
            if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
    
            /* 如果客户端设置了强制传播,则也需要进行传播 */
            if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
            if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
    
            /* 然而如果该命令实现了preventCommandPropagation()之类的方法,或者没有调用call()的相关标志,我们不进行传播 */
            if (c->flags & CLIENT_PREVENT_REPL_PROP ||
                !(flags & CMD_CALL_PROPAGATE_REPL))
                    propagate_flags &= ~PROPAGATE_REPL;
            if (c->flags & CLIENT_PREVENT_AOF_PROP ||
                !(flags & CMD_CALL_PROPAGATE_AOF))
                    propagate_flags &= ~PROPAGATE_AOF;
    
            /* 仅当需要进行传播并且该命令不是Module命令时,才进行复制。
             * Module命令以一种显式的方式进行复制,所以我们不会进行自动复制。 */
            if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
                propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
        }
    
        /* Restore the old replication flags, since call() can be executed
         * recursively. */
        c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
        c->flags |= client_old_flags &
            (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
    
        /* Handle the alsoPropagate() API to handle commands that want to propagate
         * multiple separated commands. Note that alsoPropagate() is not affected
         * by CLIENT_PREVENT_PROP flag. */
        if (server.also_propagate.numops) {
            int j;
            redisOp *rop;
    
            if (flags & CMD_CALL_PROPAGATE) {
                for (j = 0; j < server.also_propagate.numops; j++) {
                    rop = &server.also_propagate.ops[j];
                    int target = rop->target;
                    /* Whatever the command wish is, we honor the call() flags. */
                    if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
                    if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
                    if (target)
                        propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
                }
            }
            redisOpArrayFree(&server.also_propagate);
        }
        server.also_propagate = prev_also_propagate;
        server.stat_numcommands++;
    }
    
    /*  在指定数据库上下文中传播命令到AOF文件和slaves
     * flags are an xor between:
     * + PROPAGATE_NONE (no propagation of command at all)
     * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
     * + PROPAGATE_REPL (propagate into the replication link)
     *
     * 这个函数不应该用于命令实现的内部。如果要在内部使用,请使用
     * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
     */
    void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                   int flags)
    {
        /* 如果AOF打开了,且PROPAGATE_AOF,写入到AOF文件中 */
        if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
            feedAppendOnlyFile(cmd,dbid,argv,argc);
        // 如果PROPAGATE_REPL,传播到slaves
        if (flags & PROPAGATE_REPL)
            replicationFeedSlaves(server.slaves,dbid,argv,argc);
    }
    
    /* 传播写命令到slaves,并且填充复制积压。
     * 当这个实例是master时,我们使用这个函数来接收clients的命令,来创建复制流。
     * 如果当前实例是slave,并且有sub-slaves,我们将会使用replicationFeedSlavesFromMaster函数。 */
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
        listNode *ln;
        listIter li;
        int j, len;
        char llstr[LONG_STR_SIZE];
    
        /* 仅当该实例是顶层master时,才进行主从复制。
         * 当该实例是slave时,我们将直接代理复制流,来保证复制流的完全一致。
         * 因为所有的slaves将会共享同一个复制流,它们应该有相同的复制历史和偏移量。 */
        if (server.masterhost != NULL) return;
    
        /* 如果没有slave,且没有复制积压缓冲区要填充,就直接返回 */
        if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
    
        serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
    
        /* 我们需要先对每一个slave发送SELECT命令以切换到正确的数据库 */
        if (server.slaveseldb != dictid) {
            robj *selectcmd;
    
            /* For a few DBs we have pre-computed SELECT command. */
            if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
                selectcmd = shared.select[dictid];
            } else {
                int dictid_len;
    
                dictid_len = ll2string(llstr,sizeof(llstr),dictid);
                selectcmd = createObject(OBJ_STRING,
                    sdscatprintf(sdsempty(),
                    "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                    dictid_len, llstr));
            }
    
            /* 增加SELECT命令到复制积压缓冲区 */
            if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
    
            /* 把命令发送给slave */
            listRewind(slaves,&li);
            while((ln = listNext(&li))) {
                client *slave = ln->value;
                /* master正准备生成RDB文件,这条命令将会被包含进RDB中,不发送 */
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
                addReply(slave,selectcmd);
            }
    
            if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
                decrRefCount(selectcmd);
        }
        server.slaveseldb = dictid;
    
        /* 将命令以RESP的格式写入积压缓冲区中 */
        if (server.repl_backlog) {
            char aux[LONG_STR_SIZE+3];
    
            aux[0] = '*';
            len = ll2string(aux+1,sizeof(aux)-1,argc);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            feedReplicationBacklog(aux,len+3);
    
            for (j = 0; j < argc; j++) {
                long objlen = stringObjectLen(argv[j]);
    
                aux[0] = '$';
                len = ll2string(aux+1,sizeof(aux)-1,objlen);
                aux[len+1] = '\r';
                aux[len+2] = '\n';
                feedReplicationBacklog(aux,len+3);
                feedReplicationBacklogWithObject(argv[j]);
                feedReplicationBacklog(aux+len+1,2);
            }
        }
    
        /* 将命令写入到每个slaves */
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
    
            /* master正在生成RDB文件,这条命令将会被包含进RDB中,不发送 */
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
    
            /* slaves可能正在进行SYNC,我们先添加到client的output buffer中。
             * 当SYNC完成后将会发送给slaves */
    
            addReplyMultiBulkLen(slave,argc);
    
            for (j = 0; j < argc; j++)
                addReplyBulk(slave,argv[j]);
        }
    }
    

    从从复制:从节点将会在命令执行完后,将master的复制流原样复制给sub-slave

    /* 从TCP缓冲区中读取数据,写入到client->querybuf中,并进行处理 */
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        client *c = (client*) privdata;
        int nread, readlen;
        size_t qblen;
        UNUSED(el);
        UNUSED(mask);
    
        readlen = PROTO_IOBUF_LEN;
        /* 我们尽可能的申请一块大内存=16KB,这样我们可以在读取过程中不必扩容或者拷贝 */
        /* 如果请求是MultiBulk模式,并且存在未读取的bulk,且当前正在读取的bulk有待读取的剩余字节,
         * 并且待读取的字节数大于32KB,我们将会扩容当前使得可以容纳当前bulk */
        if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
            && c->bulklen >= PROTO_MBULK_BIG_ARG)
        {
            ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
    
            /* Note that the 'remaining' variable may be zero in some edge case,
             * for example once we resume a blocked client after CLIENT PAUSE. */
            if (remaining > 0 && remaining < readlen) readlen = remaining;
        }
    
        qblen = sdslen(c->querybuf);
        if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);     // 扩大sds的内部缓冲区使得可以多容纳readlen字节数据
        nread = read(fd, c->querybuf+qblen, readlen);
        if (nread == -1) {      // 读取出错
            if (errno == EAGAIN) {      // TCP缓冲区暂时无数据,返回下次继续读取
                return;
            } else {
                serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
                freeClient(c);      // 真的读取出错,关闭客户端
                return;
            }
        } else if (nread == 0) {        // 客户端关闭
            serverLog(LL_VERBOSE, "Client closed connection");
            freeClient(c);
            return;
        } else if (c->flags & CLIENT_MASTER) {
            /* 如果客户端为Master,则此命令是一个从master传播过来的命令,将会直接传播到sub-slaves
             * 追加到pending_querybuf。我们将会在命令执行完成之后同步给sub-slaves。*/
            c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                            c->querybuf+qblen,nread);
        }
    
        sdsIncrLen(c->querybuf,nread);
        c->lastinteraction = server.unixtime;
        if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
        server.stat_net_input_bytes += nread;
        if (sdslen(c->querybuf) > server.client_max_querybuf_len) {             // 如果查询缓冲区太长,则释放客户端连接
            sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
    
            bytes = sdscatrepr(bytes,c->querybuf,64);
            serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
            sdsfree(ci);
            sdsfree(bytes);
            freeClient(c);
            return;
        }
    
        /* 处理输入缓冲区中的数据,并且进行复制操作。
         * 在执行命令的前后,我们需要计算偏移量,来了解我们已经应用了master的复制流长度,
         * 之后我们将会把这部分复制流发送给sub-slaves并记录到复制积压缓冲区中。*/
        processInputBufferAndReplicate(c);
    }
    
    /* 这是processInputBuffer函数的包装,在客户端是master时,负责处理如何向子slave进行复制 */
    void processInputBufferAndReplicate(client *c) {
        if (!(c->flags & CLIENT_MASTER)) {      // 该实例是top-master, 则通过replicationFeedSlaves复制
            processInputBuffer(c);
        } else {
            /* 如果客户端是Master,说明该实例不是top-master,
             * 则不仅需要处理缓冲区中的命令,还需要将命令复制到子Slave上和复制积压缓冲区中 */
            size_t prev_offset = c->reploff;
            processInputBuffer(c);
            size_t applied = c->reploff - prev_offset;
            if (applied) {
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }
    }
    
    /* 这个函数用来实现代理:将我们从master客户端节点接收到的命令复制到子slave
     * 为什么slave-slave间的复制不复用replicationFeedSlaves:
     * 1) 在master->slave复制时会增加SELECT命令,slave->sub-slave没必要重复添加
     * 2) 可以保证slave和sub-slave同master完全一致!。
     * */
    #include <ctype.h>
    void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
        listNode *ln;
        listIter li;
    
        /* Debugging: this is handy to see the stream sent from master
         * to slaves. Disabled with if(0). */
        if (0) {
            printf("%zu:",buflen);
            for (size_t j = 0; j < buflen; j++) {
                printf("%c", isprint(buf[j]) ? buf[j] : '.');
            }
            printf("\n");
        }
    
        if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
    
            /* 对于还在还未开始RDB生成的slaves,我们添加到输出缓冲区 */
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
            addReplyString(slave,buf,buflen);
        }
    }
    

    心跳检测

    Redis的主从心跳是双向无回复心跳:

    • 双向:
      • master -> slave: 在serverCron()中通过top-master在复制流中每10s插入一个PING命令实现,在非top-master中,通过代理复制流直接就能获得心跳能力。
      • slave -> master: 在serverCron()中通过每1s向master发送REPLCONF ACK命令实现
    • 无回复:master->slave的通道用于复制流传输,slave->master的通道用于REPLCONF-ACK心跳,仅能探测连接可用,无法探测服务可用
      • master->slave: 如果对REPLCONF-ACK进行回复,将会与复制流混合,通过在命令实现中不调用addReply()写输出缓冲区来实现。
      • slave -> master: 如果slave对命令进行回复,将会与REPLCONF-ACK混合,通过在addReply()检查client->flags&CLIENT_MASTER写输出缓冲区来实现

    心跳发送

    /* 复制核心函数,每秒执行一次,在serverCron中调用 */
    void replicationCron(void) {
        ...
    
        /* 当前实例作为slave, 并且master可以理解PSYNC,发送REPLCONf ACK给master,每秒1次,作用:
         * 1) slave->master的心跳
         * 2) 同步给master当前slave的复制偏移量
         * 注意:如果master不支持PSYNC和复制偏移量。
         * */
        if (server.masterhost && server.master &&
            !(server.master->flags & CLIENT_PRE_PSYNC))
            replicationSendAck();
    
        /* 如果我们有slaves,不断地PING它们。
         * 这样slaves可以显式的实现一个到master的超时,并且能在TCP不可用时,监测到断连。 */
        listIter li;
        listNode *ln;
        robj *ping_argv[1];
    
        /* 当时实例作为master向slaves发送PING,每repl_ping_slave_period=10s一次
         * 注意:这个功能实际上是通过写入到复制积压缓冲区中实现的,而不是直接发送给slave。
         * 这意味着,在复制同步期间,所有的PING将会在slave同步期间不会真正发送给slave,
         * 而是在slave的复制状态变更为ONLINE时才发送。
         * */
        if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
            listLength(server.slaves))
        {
            ping_argv[0] = createStringObject("PING",4);
            replicationFeedSlaves(server.slaves, server.slaveseldb,
                ping_argv, 1);
            decrRefCount(ping_argv[0]);
        }
    
        ...
    }
    
    
    /* 把REPLCONF ACK命令写入到输出缓冲区中。如果我们没有连接到master,命令无影响 */
    void replicationSendAck(void) {
        client *c = server.master;
    
        if (c != NULL) {
            // 必须先设置CLIENT_MASTER_FORCE_REPLY,否则addReply会写入失败
            c->flags |= CLIENT_MASTER_FORCE_REPLY;
            addReplyMultiBulkLen(c,3);
            addReplyBulkCString(c,"REPLCONF");
            addReplyBulkCString(c,"ACK");
            addReplyBulkLongLong(c,c->reploff);
            // 写入成功,设置会不响应master命令的状态
            c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
        }
    }
    

    REPLCONF-ACK实现

    /* REPLCONF <option> <value> <option> <value> ...
     * 这个命令用于在SYNC命令之前配置复制进程。
     * 当前这个命令用于和master通信,
     * 1) 用来告诉master端口号等相关信息,使得master的info输出中可以正确的输出相关信息。
     * 2) 也用来告诉master复制过程中的相关配置信息如CAPA来让master决策。
     * */
    void replconfCommand(client *c) {
        int j;
    
        if ((c->argc % 2) == 0) {
            /* Number of arguments must be odd to make sure that every
             * option has a corresponding value. */
            addReply(c,shared.syntaxerr);
            return;
        }
    
        /* Process every option-value pair. */
        for (j = 1; j < c->argc; j+=2) {
            if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
                long port;
    
                if ((getLongFromObjectOrReply(c,c->argv[j+1],
                        &port,NULL) != C_OK))
                    return;
                c->slave_listening_port = port;
            } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
                sds ip = c->argv[j+1]->ptr;
                if (sdslen(ip) < sizeof(c->slave_ip)) {
                    memcpy(c->slave_ip,ip,sdslen(ip)+1);
                } else {
                    addReplyErrorFormat(c,"REPLCONF ip-address provided by "
                        "replica instance is too long: %zd bytes", sdslen(ip));
                    return;
                }
            } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
                /* Ignore capabilities not understood by this master. */
                if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
                    c->slave_capa |= SLAVE_CAPA_EOF;
                else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
                    c->slave_capa |= SLAVE_CAPA_PSYNC2;
            } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
                /* REPLCONF ACK用来向master报告复制偏移量。这个内部命令仅仅被用于slave。 */
                long long offset;
    
                if (!(c->flags & CLIENT_SLAVE)) return;
                if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
                    return;
                if (offset > c->repl_ack_off)
                    c->repl_ack_off = offset;
                c->repl_ack_time = server.unixtime;
                /* 如果是无盘复制,当接收到第一个ACK时,我们需要让slave真正在线 */
                if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
                    putSlaveOnline(c);
                /* 这个命令什么都不会返回 */
                return;
            } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
                /* REPLCONF GETACK is used in order to request an ACK ASAP
                 * to the slave. */
                if (server.masterhost && server.master) replicationSendAck();
                return;
            } else {
                addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
                    (char*)c->argv[j]->ptr);
                return;
            }
        }
        addReply(c,shared.ok);
    }
    

    addReply实现

    /* 把robj对象的字符串表示写入到客户端的output buffer */
    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            /* For integer encoded strings we just convert it into a string
             * using our optimized function, and attach the resulting string
             * to the output buffer. */
            char buf[32];
            size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) != C_OK)
                _addReplyStringToList(c,buf,len);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    
    /* 当我们每次想往客户端传输数据时,这个函数都会被调用.
     *
     * 如果客户端(正常的客户端)应该接受新的数据,这个函数将会返回C_OK,并且确保当socket可写时
     * 在AE事件循环中安装好命令回复处理器.
     *
     * 当客户端不应该接收一个数据时,比如fake客户端(比如用来在内存中做AOF的),作为主服务器
     * 或者安装命令回复处理器失败时,将会返回C_ERR.
     *
     * 这个函数在以下两种情况下不会安装命令回复处理器就回复C_OK:
     * 1) 当命令回复处理器已经被安装了;
     * 2) 当客户端是从服务器但是不在线,我们想要加速写,所以不确保会发送数据.
     *
     * 典型的,这个函数将会在reply准备好,在写入客户端的回复缓冲区前调用。
     * 如果没有数据应该增加,将会返回ERR
     */
    int prepareClientToWrite(client *c) {
        /* Lua客户端和Module客户端一定需要获得数据,但是我们不安装命令回复器,因为没有socket */
        if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
    
        /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
        if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
    
        /* master不接受reply,除非设置了CLIENT_MASTER_FORCE_REPLY标志 */
        if ((c->flags & CLIENT_MASTER) &&
            !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
    
        if (c->fd <= 0) return C_ERR; /* AOF加载时的Fake客户端 */
    
        /* 如果当前已经安装了-输出缓冲区中仍然有数据,则不再重复安装 */
        if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
    
        /* Authorize the caller to queue in the output buffer of this client. */
        return C_OK;
    }
    

    附:PSYNC命令格式

    PSYNC完整重同步

    PSYNC完整重同步命令响应格式:
    {{空行,可选,在RDB文件准备好前,用来作为心跳}}
    {PSYNC命令响应}}
    {{空行,可选,在RDB文件准备好前,用来作为心跳}}
    ${{RDB文件长度}}
    {{RBD文件内容}}
    {{复制缓冲区里的增量数据,以RESP协议给出}}
     * example:
    PSYNC ? -1
    
    +FULLRESYNC 9efadab34a079389ac0b56d91b7ec43f461a0fb3 308
    $281
    REDIS0009� redis-ver6.0.6�
    :_used-mem�`' �repl-stream-db��repl-id(9efadab34a079389ac0b56d91b7ec43f461a0fb3�
                                                                                    repl-offset�4�
                                                                                                  aof-preamble���beahelloworldkvmy_msgs�Qx.
    �a�b�s�Qx.cehW%�$|�*1
    $4
    PING
    *1
    $4
    PING
    *2
    $6
    SELECT
    $1
    0
    *3
    $3
    set
    $3
    yhl
    $5
    dalao
    *1
    $4
    PING
    *1
    $4
    PING
    

    PSYNC部分重同步

    PSYNC部分重同步命令响应格式:
    PSYNC 353ca630322b75bf51ed679d28f7c8c391634782 24
    +CONTINUE
    ING
    *1
    $4
    PING
    *1
    $4
    PING
    *1
    $4
    PING
    *2
    $6
    SELECT
    $1
    0
    *3
    $3
    set
    $1
    a
    $1
    b
    

    相关文章

      网友评论

          本文标题:Redis主从复制

          本文链接:https://www.haomeiwen.com/subject/jzyrjktx.html