美文网首页
Redis源码研究之主从复制

Redis源码研究之主从复制

作者: wenmingxing | 来源:发表于2018-05-05 15:19 被阅读123次

    本文主要说明Redis中主从服务器之间复制的实现。

    建议阅读:
    1、Redis主从服务器功能的理论部分见:Redis之主从服务器的复制

    I、上帝视角

    1、为完成主从复制操作,redisServer结构体中维护了几个相关的数据:

    /*src/redis.h/redisServer*/
    struct redisServer {
        ......
        /* Replication (master) */
        // 最近一次使用(访问)的数据集
        int slaveseldb; /* Last SELECTed DB in replication output */
        // 全局复制偏移量(一个累积值)
        long long master_repl_offset; /* Global replication offset */
        // 主从连接心跳频率(发送ping的频率)
        int repl_ping_slave_period; /* Master pings the slave every N seconds */
        // 指向复制积压缓冲区backlog
        char *repl_backlog; /* Replication backlog for partial syncs */
        // backlog大小
        long long repl_backlog_size; /* Backlog circular buffer size */
        // backlog中写入的新数据的大小
        long long repl_backlog_histlen; /* Backlog actual data length */
        // backlog的当前索引
        long long repl_backlog_idx; /* Backlog circular buffer current offset */
        // backlog中可以被还原的第一个字节的偏移量
        long long repl_backlog_off; /* Replication offset of first byte in the
        backlog buffer. */
        // 积压空间有效时间,即过期时间
        time_t repl_backlog_time_limit; /* Time without slaves after the backlog
        gets released. */
        
        // 距离上一次有从服务器的时间
        time_t repl_no_slaves_since;    /* We have no slaves since that time.
                                           Only valid if server.slaves len is 0. */
    
        // 是否开启最小数量从服务器写入功能
        int repl_min_slaves_to_write;   /* Min number of slaves to write. */
        // 定义最小数量从服务器的最大延迟值
        int repl_min_slaves_max_lag;    /* Max lag of <count> slaves to write. */
        // 延迟良好的从服务器的数量
        int repl_good_slaves_count;     /* Number of slaves with lag <= max_lag. */
    
    
        /* Replication (slave) */
        // 主服务器的验证密码
        char *masterauth;               /* AUTH with this password with master */
        // 主服务器的地址
        char *masterhost;               /* Hostname of master */
        // 主服务器的端口
        int masterport;                 /* Port of master */
        // 超时时间
        int repl_timeout;               /* Timeout after N seconds of master idle */
        // 主服务器所对应的客户端
        redisClient *master;     /* Client that is master for this slave */
        // 被缓存的主服务器,PSYNC 时使用
        redisClient *cached_master; /* Cached master to be reused for PSYNC. */
        int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
        // 复制的状态(服务器是从服务器时使用)
        int repl_state;          /* Replication status if the instance is a slave */
        // RDB 文件的大小
        off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
        // 已读 RDB 文件内容的字节数
        off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
        // 最近一次执行 fsync 时的偏移量
        // 用于 sync_file_range 函数
        off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
        // 主服务器的套接字
        int repl_transfer_s;     /* Slave -> Master SYNC socket */
        // 保存 RDB 文件的临时文件的描述符
        int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
        // 保存 RDB 文件的临时文件名字
        char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
        // 最近一次读入 RDB 内容的时间
        time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
        int repl_serve_stale_data; /* Serve stale data when link is down? */
        // 是否只读从服务器?
        int repl_slave_ro;          /* Slave is read only? */
        // 连接断开的时长
        time_t repl_down_since; /* Unix time at which link with master went down */
        // 是否要在 SYNC 之后关闭 NODELAY ?
        int repl_disable_tcp_nodelay;   /* Disable TCP_NODELAY after SYNC? */
        // 从服务器优先级
        int slave_priority;             /* Reported in INFO and used by Sentinel. */
        // 本服务器(从服务器)当前主服务器的 RUN ID
        char repl_master_runid[REDIS_RUN_ID_SIZE+1];  /* Master run id for PSYNC. */
        // 初始化偏移量
        long long repl_master_initial_offset;         /* Master PSYNC offset. */
    }  
    

    2、下面我们看关于上述代码中提到的复制积压缓冲区的内容,这是实现部分重同步的重要结构:

    复制积压缓冲区可以看成是一个队列,当执行一个Redis命令时,积压缓冲区中的内容将会更新,在源码中调用顺序是这样的call()-->propagate()-->replicationFeedSlaves()

    /*调用命令的实现函数,执行命令*/
    /*src/redis.c/call*/
    void call(redisClient *c, int flags) {
        ......
        /* Call the command. */
        c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
        redisOpArrayInit(&server.also_propagate);
        // 脏数据标记,数据是否被修改
        dirty = server.dirty;
        // 执行命令对应的函数
        c->cmd->proc(c);
        dirty = server.dirty-dirty;
        duration = ustime()-start;
        ......
        // 将客户端请求的数据修改记录传播给AOF 和从机
        /* Propagate the command into the AOF and replication link */
        if (flags & REDIS_CALL_PROPAGATE) {
            int flags = REDIS_PROPAGATE_NONE;
            // 强制主从复制
        if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
            // 强制AOF 持久化
        if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
            // 数据被修改
        if (dirty)
            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
            // 传播数据修改记录
        if (flags != REDIS_PROPAGATE_NONE)
            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);  //命令传播
        }
        ......
    }  
    

    3、propagate()函数有两个传播方向,一个是AOF持久化,一个是主从复制:

    /*命令传播*/  
    /*src/redis.c/propagate*/
    /*FLAG 可以是以下标识的 xor :
     *
     * + REDIS_PROPAGATE_NONE (no propagation of command at all)
     *   不传播
     *
     * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
     *   传播到 AOF
     *
     * + REDIS_PROPAGATE_REPL (propagate into the replication link)
     *   传播到 slave
     */
    void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                   int flags)
    {
        // 传播到 AOF
        if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
            feedAppendOnlyFile(cmd,dbid,argv,argc);
    
        // 传播到 slave
        if (flags & REDIS_PROPAGATE_REPL)
            replicationFeedSlaves(server.slaves,dbid,argv,argc);
    }
    

    4、关于AOF的内容我们已经在前面讲过,我们下面看replicationFeedSlaves是如何实现的:

    // 将传入的参数发送给从服务器
    /*src/replication.c/replicationFeedSlaves*/
    // 操作分为三步:
    // 1) 构建协议内容
    // 2) 将协议内容备份到 backlog
    // 3) 将内容发送给各个从服务器
    void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
        listNode *ln;
        listIter li;
        int j, len;
        char llstr[REDIS_LONGSTR_SIZE];
    
        /* If there aren't slaves, and there is no backlog buffer to populate,
         * we can return ASAP. */
        // backlog 为空,且没有从服务器,直接返回
        if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
    
        /* We can't have slaves attached and no backlog. */
        redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
    
        /* Send SELECT command to every slave if needed. */
        // 如果有需要的话,发送 SELECT 命令,指定数据库
        if (server.slaveseldb != dictid) {
            robj *selectcmd;
    
            /* For a few DBs we have pre-computed SELECT command. */
            if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
                selectcmd = shared.select[dictid];
            } else {
                int dictid_len;
    
                dictid_len = ll2string(llstr,sizeof(llstr),dictid);
                selectcmd = createObject(REDIS_STRING,
                    sdscatprintf(sdsempty(),
                    "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                    dictid_len, llstr));
            }
    
            /* Add the SELECT command into the backlog. */
            // 将 SELECT 命令添加到 backlog
            if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
    
            /* Send it to slaves. */
            // 发送给所有从服务器
            listRewind(slaves,&li);
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;
                addReply(slave,selectcmd);
            }
    
            if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
                decrRefCount(selectcmd);
        }
    
        server.slaveseldb = dictid;
    
        /* Write the command to the replication backlog if any. */
        // 将命令写入到backlog
        if (server.repl_backlog) {
            char aux[REDIS_LONGSTR_SIZE+3];
    
            /* Add the multi bulk reply length. */
            aux[0] = '*';
            len = ll2string(aux+1,sizeof(aux)-1,argc);
            aux[len+1] = '\r';
            aux[len+2] = '\n';
            feedReplicationBacklog(aux,len+3);
    
            for (j = 0; j < argc; j++) {
                long objlen = stringObjectLen(argv[j]);
    
                /* We need to feed the buffer with the object as a bulk reply
                 * not just as a plain string, so create the $..CRLF payload len 
                 * ad add the final CRLF */
                // 将参数从对象转换成协议格式
                aux[0] = '$';
                len = ll2string(aux+1,sizeof(aux)-1,objlen);
                aux[len+1] = '\r';
                aux[len+2] = '\n';
                feedReplicationBacklog(aux,len+3);
                feedReplicationBacklogWithObject(argv[j]);
                feedReplicationBacklog(aux+len+1,2);
            }
        }
    
        /* Write the command to every slave. */
        listRewind(slaves,&li);
        while((ln = listNext(&li))) {
    
            // 指向从服务器
            redisClient *slave = ln->value;
    
            /* Don't feed slaves that are still waiting for BGSAVE to start */
            // 不要给正在等待 BGSAVE 开始的从服务器发送命令
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
    
            /* Feed slaves that are waiting for the initial SYNC (so these commands
             * are queued in the output buffer until the initial SYNC completes),
             * or are already in sync with the master. */
            // 向已经接收完和正在接收 RDB 文件的从服务器发送命令
            // 如果从服务器正在接收主服务器发送的 RDB 文件,
            // 那么在初次 SYNC 完成之前,主服务器发送的内容会被放进一个缓冲区里面
    
            /* Add the multi bulk length. */
            addReplyMultiBulkLen(slave,argc);
    
            /* Finally any additional argument that was not stored inside the
             * static buffer if any (from j to argc). */
            for (j = 0; j < argc; j++)
                addReplyBulk(slave,argv[j]);
        }
    }
    

    II、主从关系的建立

    1、主从服务器的建立是通过从服务器发送SLAVEOF命令完成的,从服务器会自动连接主机,注册响应的读写事件:

    void slaveofCommand(redisClient *c) {
        /* SLAVEOF is not allowed in cluster mode as replication is automatically
         * configured using the current address of the master node. */
        // 不允许在集群模式中使用
        if (server.cluster_enabled) {
            addReplyError(c,"SLAVEOF not allowed in cluster mode.");
            return;
        }
    
        /* The special host/port combination "NO" "ONE" turns the instance
         * into a master. Otherwise the new master address is set. */
        // SLAVEOF NO ONE 让从服务器转为主服务器
        if (!strcasecmp(c->argv[1]->ptr,"no") &&
            !strcasecmp(c->argv[2]->ptr,"one")) {
            if (server.masterhost) {
                // 让服务器取消复制,成为主服务器
                replicationUnsetMaster();
                redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
            }
        } else {
            long port;
    
            // 获取端口参数
            if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
                return;
    
            /* Check if we are already attached to the specified slave */
            // 检查输入的 host 和 port 是否服务器目前的主服务器
            // 如果是的话,向客户端返回 +OK ,不做其他动作
            if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
                && server.masterport == port) {
                redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
                addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
                return;
            }
    
            /* There was no previous master or the user specified a different one,
             * we can continue. */
            // 没有前任主服务器,或者客户端指定了新的主服务器
            // 开始执行复制操作
            replicationSetMaster(c->argv[1]->ptr, port);
            redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
                server.masterhost, server.masterport);
        }
        addReply(c,shared.ok);
    }  
    

    2、同步的选取
    当从服务器发起时首先会判断是否可以执行部分重同步,若不行则被迫执行全同步。
    其判断及调用过程如下:

    /*从服务器用于同步主服务器的回调函数*/
    /*src/replication.c/syncWithMaster*/
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
        char tmpfile[256], *err;
        int dfd, maxtries = 5;
        int sockerr = 0, psync_result;
        socklen_t errlen = sizeof(sockerr);
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(privdata);
        REDIS_NOTUSED(mask);
    
        /* If this event fired after the user turned the instance into a master
         * with SLAVEOF NO ONE we must just return ASAP. */
        // 如果处于 SLAVEOF NO ONE 模式,那么关闭 fd
        if (server.repl_state == REDIS_REPL_NONE) {
            close(fd);
            return;
        }
    
        /* Check for errors in the socket. */
        // 检查套接字错误
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
            sockerr = errno;
        if (sockerr) {
            aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
            redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
                strerror(sockerr));
            goto error;
        }
    
        /* If we were connecting, it's time to send a non blocking PING, we want to
         * make sure the master is able to reply before going into the actual
         * replication process where we have long timeouts in the order of
         * seconds (in the meantime the slave would block). */
        // 如果状态为 CONNECTING ,那么在进行初次同步之前,
        // 向主服务器发送一个非阻塞的 PONG 
        // 因为接下来的 RDB 文件发送非常耗时,所以我们想确认主服务器真的能访问
        if (server.repl_state == REDIS_REPL_CONNECTING) {
            redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
            /* Delete the writable event so that the readable event remains
             * registered and we can wait for the PONG reply. */
            // 手动发送同步 PING ,暂时取消监听写事件
            aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
            // 更新状态
            server.repl_state = REDIS_REPL_RECEIVE_PONG;
            /* Send the PING, don't check for errors at all, we have the timeout
             * that will take care about this. */
            // 同步发送 PING
            syncWrite(fd,"PING\r\n",6,100);
    
            // 返回,等待 PONG 到达
            return;
        }
    
        /* Receive the PONG command. */
        // 接收 PONG 命令
        if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
            char buf[1024];
    
            /* Delete the readable event, we no longer need it now that there is
             * the PING reply to read. */
            // 手动同步接收 PONG ,暂时取消监听读事件
            aeDeleteFileEvent(server.el,fd,AE_READABLE);
    
            /* Read the reply with explicit timeout. */
            // 尝试在指定时间限制内读取 PONG
            buf[0] = '\0';
            // 同步接收 PONG
            if (syncReadLine(fd,buf,sizeof(buf),
                server.repl_syncio_timeout*1000) == -1)
            {
                redisLog(REDIS_WARNING,
                    "I/O error reading PING reply from master: %s",
                    strerror(errno));
                goto error;
            }
    
            /* We accept only two replies as valid, a positive +PONG reply
             * (we just check for "+") or an authentication error.
             * Note that older versions of Redis replied with "operation not
             * permitted" instead of using a proper error code, so we test
             * both. */
            // 接收到的数据只有两种可能:
            // 第一种是 +PONG ,第二种是因为未验证而出现的 -NOAUTH 错误
            if (buf[0] != '+' &&
                strncmp(buf,"-NOAUTH",7) != 0 &&
                strncmp(buf,"-ERR operation not permitted",28) != 0)
            {
                // 接收到未验证错误
                redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
                goto error;
            } else {
                // 接收到 PONG
                redisLog(REDIS_NOTICE,
                    "Master replied to PING, replication can continue...");
            }
        }
    
        /* AUTH with the master if required. */
        // 进行身份验证
        if(server.masterauth) {
            err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
            if (err[0] == '-') {
                redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
                sdsfree(err);
                goto error;
            }
            sdsfree(err);
        }
    
        /* Set the slave port, so that Master's INFO command can list the
         * slave listening port correctly. */
        // 将从服务器的端口发送给主服务器,
        // 使得主服务器的 INFO 命令可以显示从服务器正在监听的端口
        {
            sds port = sdsfromlonglong(server.port);
            err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
                                             NULL);
            sdsfree(port);
            /* Ignore the error if any, not all the Redis versions support
             * REPLCONF listening-port. */
            if (err[0] == '-') {
                redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
            }
            sdsfree(err);
        }
    
        /* Try a partial resynchonization. If we don't have a cached master
         * slaveTryPartialResynchronization() will at least try to use PSYNC
         * to start a full resynchronization so that we get the master run id
         * and the global offset, to try a partial resync at the next
         * reconnection attempt. */
        // 根据返回的结果决定是执行部分 resync ,还是 full-resync
        psync_result = slaveTryPartialResynchronization(fd);
    
        // 可以执行部分 resync
        if (psync_result == PSYNC_CONTINUE) {
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
            // 返回
            return;
        }
    
        /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
         * and the server.repl_master_runid and repl_master_initial_offset are
         * already populated. */
        // 主服务器不支持 PSYNC ,发送 SYNC
        if (psync_result == PSYNC_NOT_SUPPORTED) {
            redisLog(REDIS_NOTICE,"Retrying with SYNC...");
            // 向主服务器发送 SYNC 命令
            if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
                redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                    strerror(errno));
                goto error;
            }
        }
    
        // 如果执行到这里,
        // 那么 psync_result == PSYNC_FULLRESYNC 或 PSYNC_NOT_SUPPORTED
    
        /* Prepare a suitable temp file for bulk transfer */
        // 打开一个临时文件,用于写入和保存接下来从主服务器传来的 RDB 文件数据
        while(maxtries--) {
            snprintf(tmpfile,256,
                "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
            if (dfd != -1) break;
            sleep(1);
        }
        if (dfd == -1) {
            redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
            goto error;
        }
    
        /* Setup the non blocking download of the bulk file. */
        // 设置一个读事件处理器,来读取主服务器的 RDB 文件
        if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
                == AE_ERR)
        {
            redisLog(REDIS_WARNING,
                "Can't create readable event for SYNC: %s (fd=%d)",
                strerror(errno),fd);
            goto error;
        }
    
        // 设置状态
        server.repl_state = REDIS_REPL_TRANSFER;
    
        // 更新统计信息
        server.repl_transfer_size = -1;
        server.repl_transfer_read = 0;
        server.repl_transfer_last_fsync_off = 0;
        server.repl_transfer_fd = dfd;
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_tmpfile = zstrdup(tmpfile);
    
        return;
    
    error:
        close(fd);
        server.repl_transfer_s = -1;
        server.repl_state = REDIS_REPL_CONNECT;
        return;
    }  
    

    3、syncWithMaster主要通过调用slaveTryPartialResynchronization来判断是否能进行部分重同步,否则进行全同步:

    /*src/replication.c/slaveTryPartialResynchronization*/
    /* Try a partial resynchronization with the master if we are about to reconnect.
     *
     * 在重连接之后,尝试进行部分重同步。
     *
     * If there is no cached master structure, at least try to issue a
     * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
     * command in order to obtain the master run id and the master replication
     * global offset.
     *
     * 如果 master 缓存为空,那么通过 "PSYNC ? -1" 命令来触发一次 full resync ,
     * 让主服务器的 run id 和复制偏移量可以传到附属节点里面。
     *
     * This function is designed to be called from syncWithMaster(), so the
     * following assumptions are made:
     *
     * 这个函数由 syncWithMaster() 函数调用,它做了以下假设:
     *
     * 1) We pass the function an already connected socket "fd".
     *    一个已连接套接字 fd 会被传入函数
     * 2) This function does not close the file descriptor "fd". However in case
     *    of successful partial resynchronization, the function will reuse
     *    'fd' as file descriptor of the server.master client structure.
     *    函数不会关闭 fd 。
     *    当部分同步成功时,函数会将 fd 用作 server.master 客户端结构中的
     *    文件描述符。
     *
     * The function returns:
     * 以下是函数的返回值:
     *
     * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
     *                 PSYNC 命令成功,可以继续。
     * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
     *                   In this case the master run_id and global replication
     *                   offset is saved.
     *                   主服务器支持 PSYNC 功能,但目前情况需要执行 full resync 。
     *                   在这种情况下, run_id 和全局复制偏移量会被保存。
     * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
     *                      the caller should fall back to SYNC.
     *                      主服务器不支持 PSYNC ,调用者应该下降到 SYNC 命令。
     */
    
    #define PSYNC_CONTINUE 0
    #define PSYNC_FULLRESYNC 1
    #define PSYNC_NOT_SUPPORTED 2
    int slaveTryPartialResynchronization(int fd) {
        char *psync_runid;
        char psync_offset[32];
        sds reply;
    
        /* Initially set repl_master_initial_offset to -1 to mark the current
         * master run_id and offset as not valid. Later if we'll be able to do
         * a FULL resync using the PSYNC command we'll set the offset at the
         * right value, so that this information will be propagated to the
         * client structure representing the master into server.master. */
        server.repl_master_initial_offset = -1;
    
        if (server.cached_master) {
            // 缓存存在,尝试部分重同步
            // 命令为 "PSYNC <master_run_id> <repl_offset>"
            psync_runid = server.cached_master->replrunid;
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
        } else {
            // 缓存不存在
            // 发送 "PSYNC ? -1" ,要求完整重同步
            redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_runid = "?";
            memcpy(psync_offset,"-1",3);
        }
    
        /* Issue the PSYNC command */
        // 向主服务器发送 PSYNC 命令
        reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
    
        // 接收到 FULLRESYNC ,进行 full-resync
        if (!strncmp(reply,"+FULLRESYNC",11)) {
            char *runid = NULL, *offset = NULL;
    
            /* FULL RESYNC, parse the reply in order to extract the run id
             * and the replication offset. */
            // 分析并记录主服务器的 run id
            runid = strchr(reply,' ');
            if (runid) {
                runid++;
                offset = strchr(runid,' ');
                if (offset) offset++;
            }
            // 检查 run id 的合法性
            if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
                redisLog(REDIS_WARNING,
                    "Master replied with wrong +FULLRESYNC syntax.");
                /* This is an unexpected condition, actually the +FULLRESYNC
                 * reply means that the master supports PSYNC, but the reply
                 * format seems wrong. To stay safe we blank the master
                 * runid to make sure next PSYNCs will fail. */
                // 主服务器支持 PSYNC ,但是却发来了异常的 run id
                // 只好将 run id 设为 0 ,让下次 PSYNC 时失败
                memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
            } else {
                // 保存 run id
                memcpy(server.repl_master_runid, runid, offset-runid-1);
                server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
                // 以及 initial offset
                server.repl_master_initial_offset = strtoll(offset,NULL,10);
                // 打印日志,这是一个 FULL resync
                redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
                    server.repl_master_runid,
                    server.repl_master_initial_offset);
            }
            /* We are going to full resync, discard the cached master structure. */
            // 要开始完整重同步,缓存中的 master 已经没用了,清除它
            replicationDiscardCachedMaster();
            sdsfree(reply);
            
            // 返回状态
            return PSYNC_FULLRESYNC;
        }
    
        // 接收到 CONTINUE ,进行 partial resync
        if (!strncmp(reply,"+CONTINUE",9)) {
            /* Partial resync was accepted, set the replication state accordingly */
            redisLog(REDIS_NOTICE,
                "Successful partial resynchronization with master.");
            sdsfree(reply);
            // 将缓存中的 master 设为当前 master
            replicationResurrectCachedMaster(fd);
    
            // 返回状态
            return PSYNC_CONTINUE;
        }
    
        /* If we reach this point we receied either an error since the master does
         * not understand PSYNC, or an unexpected reply from the master.
         * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
    
        // 接收到错误?
        if (strncmp(reply,"-ERR",4)) {
            /* If it's not an error, log the unexpected event. */
            redisLog(REDIS_WARNING,
                "Unexpected reply to PSYNC from master: %s", reply);
        } else {
            redisLog(REDIS_NOTICE,
                "Master does not support PSYNC or is in "
                "error state (reply: %s)", reply);
        }
        sdsfree(reply);
        replicationDiscardCachedMaster();
    
        // 主服务器不支持 PSYNC
        return PSYNC_NOT_SUPPORTED;
    }  
    

    III、全同步

    全同步发生在两种情况下:
    1、从服务器第一次连接到主服务器;
    2、从服务器断线之后,复制积压缓冲区中的可恢复内容已经落后与offset;

    syncCommand()是同步操作的执行函数,包括SYNCPSYNC,下面我们只看其SYNC部分:

    // 主机SYNC 和PSYNC 命令处理函数,会尝试进行部分同步和全同步
    /*src/replication.c/syncCommand*/
    /* SYNC ad PSYNC command implemenation. */
    void syncCommand(redisClient *c) {
        ......
        // 主机尝试部分同步,失败的话向从机发送+FULLRESYNC master_runid offset,
        // 接着启动BGSAVE
        // 执行全同步:
        /* Full resynchronization. */
        server.stat_sync_full++;
        /* Here we need to check if there is a background saving operation
        * in progress, or if it is required to start one */
        if (server.rdb_child_pid != -1) {
            /* 存在BGSAVE 后台进程。
            1. 如果master 现有所连接的所有从机slaves 当中有存在
            REDIS_REPL_WAIT_BGSAVE_END 的从机,那么将从机c 设置为
            REDIS_REPL_WAIT_BGSAVE_END;
            2. 否则,设置为REDIS_REPL_WAIT_BGSAVE_START*/
            /* Ok a background save is in progress. Let's check if it is a good
            * one for replication, i.e. if there is another slave that is
            * registering differences since the server forked to save */
            redisClient *slave;
            listNode *ln;
            listIter li;
            // 检测是否已经有从机申请全同步
            listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
            }
        if (ln) {
            // 存在状态为REDIS_REPL_WAIT_BGSAVE_END 的从机slave,
            // 就将此从机c 状态设置为REDIS_REPL_WAIT_BGSAVE_END,
            // 从而在BGSAVE 进程结束后,可以发送RDB 文件,
            // 同时将从机slave 中的更新复制到此从机c。
            /* Perfect, the server is already registering differences for
            * another slave. Set the right state, and copy the buffer. */
            // 将其他从机上的待回复的缓存复制到从机c
            copyClientOutputBuffer(c,slave);
            // 修改从机c 状态为「等待BGSAVE 进程结束」
            c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            // 不存在状态为REDIS_REPL_WAIT_BGSAVE_END 的从机,就将此从机c 状态设置为
            // REDIS_REPL_WAIT_BGSAVE_START,即等待新的BGSAVE 进程的开启。
            // 修改状态为「等待BGSAVE 进程开始」
            /* No way, we need to wait for the next BGSAVE in order to
            * register differences */
            c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
            redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
        }
        } else {
            // 不存在BGSAVE 后台进程,启动一个新的BGSAVE 进程
            * Ok we don't have a BGSAVE in progress, let's start one */
            redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        // 将此从机c 状态设置为REDIS_REPL_WAIT_BGSAVE_END,从而在BGSAVE
        // 进程结束后,可以发送RDB 文件,同时将从机slave 中的更新复制到此从机c。
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        // 因为新的slave进入,刷新复制脚本缓存
        /* Flush the script cache for the new slave. */
        replicationScriptCacheFlush();
        }
        if (server.repl_disable_tcp_nodelay)
            anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
            c->repldbfd = -1;
            c->flags |= REDIS_SLAVE;
            server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
            listAddNodeTail(server.slaves,c);
        if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
            createReplicationBacklog();
        return;
    }  
    

    主机在执行完BGSAVE之后,会将RDB文件发送给从服务器,这是通过调用backgroundSaveDoneHandler完成的:

    /*
     * 处理 BGSAVE 完成时发送的信号
     */
    /*src/rdb.c/backgroundSaveDoneHandler*/
    void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    
        // BGSAVE 成功
        if (!bysignal && exitcode == 0) {
            redisLog(REDIS_NOTICE,
                "Background saving terminated with success");
            server.dirty = server.dirty - server.dirty_before_bgsave;
            server.lastsave = time(NULL);
            server.lastbgsave_status = REDIS_OK;
    
        // BGSAVE 出错
        } else if (!bysignal && exitcode != 0) {
            redisLog(REDIS_WARNING, "Background saving error");
            server.lastbgsave_status = REDIS_ERR;
    
        // BGSAVE 被中断
        } else {
            redisLog(REDIS_WARNING,
                "Background saving terminated by signal %d", bysignal);
            // 移除临时文件
            rdbRemoveTempFile(server.rdb_child_pid);
            /* SIGUSR1 is whitelisted, so we have a way to kill a child without
             * tirggering an error conditon. */
            if (bysignal != SIGUSR1)
                server.lastbgsave_status = REDIS_ERR;
        }
    
        // 更新服务器状态
        server.rdb_child_pid = -1;
        server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
        server.rdb_save_time_start = -1;
    
        /* Possibly there are slaves waiting for a BGSAVE in order to be served
         * (the first stage of SYNC is a bulk transfer of dump.rdb) */
        // 处理正在等待 BGSAVE 完成的那些 slave
        updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
    }
    
    

    这其中关于主从复制的操作就是调用了updateSlaveWaitingBgsave:

    /* This function is called at the end of every background saving.
     * 在每次 BGSAVE 执行完毕之后使用
     *
     * The argument bgsaveerr is REDIS_OK if the background saving succeeded
     * otherwise REDIS_ERR is passed to the function.
     * bgsaveerr 可能是 REDIS_OK 或者 REDIS_ERR ,显示 BGSAVE 的执行结果
     *
     * The goal of this function is to handle slaves waiting for a successful
     * background saving in order to perform non-blocking synchronization. 
     * 
     * 这个函数是在 BGSAVE 完成之后的异步回调函数,
     * 它指导该怎么执行和 slave 相关的 RDB 下一步工作。
     */
    /*src/replication.c/updateSlavesWaitingBgsave*/
    void updateSlavesWaitingBgsave(int bgsaveerr) {
        listNode *ln;
        int startbgsave = 0;
        listIter li;
    
        // 遍历所有 slave
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;
    
            if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
                // 之前的 RDB 文件不能被 slave 使用,
                // 开始新的 BGSAVE
                startbgsave = 1;
                slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
            } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
    
                // 执行到这里,说明有 slave 在等待 BGSAVE 完成
    
                struct redis_stat buf;
    
                // 但是 BGSAVE 执行错误
                if (bgsaveerr != REDIS_OK) {
                    // 释放 slave
                    freeClient(slave);
                    redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                    continue;
                }
    
                // 打开 RDB 文件
                if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                    redis_fstat(slave->repldbfd,&buf) == -1) {
                    freeClient(slave);
                    redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                    continue;
                }
    
                // 设置偏移量,各种值
                slave->repldboff = 0;
                slave->repldbsize = buf.st_size;
                // 更新状态
                slave->replstate = REDIS_REPL_SEND_BULK;
    
                slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
                    (unsigned long long) slave->repldbsize);
    
                // 清空之前的写事件处理器
                aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
                // 将 sendBulkToSlave 安装为 slave 的写事件处理器
                // 它用于将 RDB 文件发送给 slave
                if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                    freeClient(slave);
                    continue;
                }
            }
        }
    
        // 需要执行新的 BGSAVE
        if (startbgsave) {
            /* Since we are starting a new background save for one or more slaves,
             * we flush the Replication Script Cache to use EVAL to propagate every
             * new EVALSHA for the first time, since all the new slaves don't know
             * about previous scripts. */
            // 开始行的 BGSAVE ,并清空脚本缓存
            replicationScriptCacheFlush();
            if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
                listIter li;
    
                listRewind(server.slaves,&li);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
                while((ln = listNext(&li))) {
                    redisClient *slave = ln->value;
    
                    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                        freeClient(slave);
                }
            }
        }
    }
    
    

    IV、部分重同步

    之前已经说明,部分重同步功能是通过复制偏移量复制积压缓冲区等结构维护的。接下来我们看其是如何进行主从服务器交互的:

    1、从机在connectWithMaster连接主机时会将syncWithMaster设置为回调函数:

    // 连接主机connectWithMaster() 的时候,会被注册为回调函数
    /*src/replication.c/syncWithMaster*/
    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
        char tmpfile[256], *err;
        int dfd, maxtries = 5;
        int sockerr = 0, psync_result;
        socklen_t errlen = sizeof(sockerr);
        ......
        // 尝试部分同步,主机允许进行部分同步会返回+CONTINUE,从机接收后注册相应的事件
        /* Try a partial resynchonization. If we don't have a cached master
        * slaveTryPartialResynchronization() will at least try to use PSYNC
        * to start a full resynchronization so that we get the master run id
        * and the global offset, to try a partial resync at the next
        * reconnection attempt. */
        // 函数返回三种状态:
        // PSYNC_CONTINUE:表示会进行部分同步,在slaveTryPartialResynchronization()
        // 中已经设置回调函数readQueryFromClient()
        // PSYNC_FULLRESYNC:全同步,会下载RDB 文件
        // PSYNC_NOT_SUPPORTED:未知
        psync_result = slaveTryPartialResynchronization(fd);
        if (psync_result == PSYNC_CONTINUE) {
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
            "Partial Resynchronization.");
            return;
        }
        // 执行全同步
        ......
    }  
    

    2、其中的slaveTryPartialResynchronization函数即为判断是进行部分重同步还是全同步这在上面已经说明。

    下面看syncCommand中关于部分重同步的部分:

    /*src/replication.c/syncCommand*/
    void syncCommand(redisClient *c) {
        ......
        // 主机尝试部分同步,允许则进行部分同步,会返回+CONTINUE,接着发送积压空间
        /* Try a partial resynchronization if this is a PSYNC command.
        * If it fails, we continue with usual full resynchronization, however
        * when this happens masterTryPartialResynchronization() already
        * replied with:
        **
        +FULLRESYNC <runid> <offset>
        **
        So the slave knows the new runid and offset to try a PSYNC later
        * if the connection with the master is lost. */
        if (!strcasecmp(c->argv[0]->ptr,"psync")) {
            // 部分同步
        if (masterTryPartialResynchronization(c) == REDIS_OK) {
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
            // 部分同步失败,会进行全同步,这时会收到来自客户端的runid
            char *master_runid = c->argv[1]->ptr;
            /* Increment stats for failed PSYNCs, but only if the
            * runid is not "?", as this is used by slaves to force a full
            * resync on purpose when they are not albe to partially
            * resync. */
        if (master_runid[0] != '?') server.stat_sync_partial_err++;
        }
        } else {
            /* If a slave uses SYNC, we are dealing with an old implementation
            * of the replication protocol (like redis-cli --slave). Flag the client
            * so that we don't expect to receive REPLCONF ACK feedbacks. */
            c->flags |= REDIS_PRE_PSYNC_SLAVE;
        }
        // 执行全同步:
        ......
    }  
    

    3、主机收到部分重同步请求之后也会判断是否允许:

    // 主机尝试是否能进行部分同步
    /* This function handles the PSYNC command from the point of view of a
    * master receiving a request for partial resynchronization.
    **
    On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
    * with the usual full resync. */
    int masterTryPartialResynchronization(redisClient *c) {
        long long psync_offset, psync_len;
        char *master_runid = c->argv[1]->ptr;
        char buf[128];
        int buflen;
        /* Is the runid of this master the same advertised by the wannabe slave
        * via PSYNC? If runid changed this master is a different instance and
        * there is no way to continue. */
        if (strcasecmp(master_runid, server.runid)) {
            // 当因为异常需要与主机断开连接的时候,从机会暂存主机的状态信息,以便
            // 下一次的部分同步。
            // 1)master_runid 是从机提供一个因缓存主机的runid,
            // 2)server.runid 是本机(主机)的runid。
            // 匹配失败,说明是本机(主机)不是从机缓存的主机,这时候不能进行部分同步,
            // 只能进行全同步
            // "?" 表示从机要求全同步
            // 什么时候从机会要求全同步???
        /* Run id "?" is used by slaves that want to force a full resync. */
        if (master_runid[0] != '?') {
            redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
            "Runid mismatch (Client asked for '%s', I'm '%s')",
            master_runid, server.runid);
        } else {
            redisLog(REDIS_NOTICE,"Full resync requested by slave.");
        }
            goto need_full_resync;
        }
        // 从参数中解析整数,整数是从机指定的偏移量
        /* We still have the data our slave is asking for? */
        if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
            REDIS_OK) goto need_full_resync;
            // 部分同步失败的情况:
            // 1、不存在积压空间
        if (!server.repl_backlog ||
            // 2、psync_offset 太过小,即从机错过太多更新记录,安全起见,实行全同步
            // 我们知道,积压空间的大小是有限的,如果某个从机错过的更新过多,将无法
            // 在积压空间中找到更新的记录
            psync_offset 越界
            psync_offset < server.repl_backlog_off ||
            psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
            // 经检测,不满足部分同步的条件,转而进行全同步
        {
        redisLog(REDIS_NOTICE,
        "Unable to partial resync with the slave for lack of backlog "
        "(Slave request was: %lld).", psync_offset);
        if (psync_offset > server. ) {
            redisLog(REDIS_WARNING,
            "Warning: slave tried to PSYNC with an offset that is "
            "greater than the master replication offset.");
        }
        goto need_full_resync;
        }
        // 执行部分同步:
        // 1)标记客户端为从机
        // 2)通知从机准备接收数据。从机收到+CONTINUE 会做好准备
        // 3)开发发送数据
        /* If we reached this point, we are able to perform a partial resync:
        * 1) Set client state to make it a slave.
        * 2) Inform the client we can continue with +CONTINUE
        * 3) Send the backlog data (from the offset to the end) to the slave. */
        // 将连接的客户端标记为从机
        c->flags |= REDIS_SLAVE;
        // 表示进行部分同步
        // #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
        // updates. */
        c->replstate = REDIS_REPL_ONLINE;
        // 更新ack 的时间
        c->repl_ack_time = server.unixtime;
        // 添加入从机链表
        listAddNodeTail(server.slaves,c);
        // 告诉从机可以进行部分同步,从机收到后会做相关的准备(注册回调函数)
        /* We can't use the connection buffers since they are used to accumulate
        * new commands at this stage. But we are sure the socket send buffer is
        * emtpy so this write will never fail actually. */
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
        if (write(c->fd,buf,buflen) != buflen) {
            freeClientAsync(c);
            return REDIS_OK;
        }
        // 向从机写积压空间中的数据,积压空间存储有「更新缓存」
        psync_len = addReplyReplicationBacklog(c,psync_offset);
        redisLog(REDIS_NOTICE,
        "Partial resynchronization request accepted. Sending %lld bytes of "
        "backlog starting from offset %lld.", psync_len, psync_offset);
        /* Note that we don't need to set the selected DB at server.slaveseldb
        * to -1 to force the master to emit SELECT, since the slave already
        * has this state from the previous connection with the master. */
        refreshGoodSlavesCount();
        return REDIS_OK; /* The caller can return, no full resync needed. */
        need_full_resync:
        ......
        // 向从机发送+FULLRESYNC runid repl_offset
    }  
    

    【参考】
    [1] 《Redis设计与实现》
    [2] 《Redis源码日志》

    相关文章

      网友评论

          本文标题:Redis源码研究之主从复制

          本文链接:https://www.haomeiwen.com/subject/zlchrftx.html