redis 主从同步master端处理
redis 主从同步的过程始于一系列类似tcp三次握手的过程,归于"sync/psync"命令。分析redis主从同步master端的处理逻辑需要从syncCommand的函数开始进行分析。
redis 主从同步过程中master的执行内容包括:
- 接收slave的sync/psync命令
- 执行bgsave命令异步启动rdb生成
- crontab定时检查rdb是否生成完毕
- 发送rdb文件到slave
- 发送rdb文件生成过程中缓存的redis执行命令
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
redis syncCommand处理逻辑
整个处理逻辑主要分为处理部分同步命令psync和全量同步命令sync,整个交互过程如下
- 先尝试部分同步psync操作,成功则直接同步数据到slave
- 部分同步psync操作失败,尝试全量同步sync操作
- 全量同步操作区分是否已有在执行中的bgsave命令,有则共享没有则重新开启线程异步执行
- 定时任务负责检查异步任务是否完成,完成则发送rdb数据到slave
- 发送rdb生成过程中缓存的redis执行命令到slave
需要针对触发bgsave命令的部分作下详细说明,因为这个是核心的关键点:
- 首先判断master是否正在执行bgsave命令,通过是否有启动bgsave的线程(server.rdb_child_pid)进行判断。
- 如果正在执行bgsave命令,那么我们就等待前一个bgsave生成的rdb文件
- 需要重点指出复用的过程中需要把rdb过程累计的redis命令也复制一份通过copyClientOutputBuffer。因为rdb复用了,所以这些累计的命令也需要复用。
- 如果没有执行bgsave命令,那么就需要启动bgsave任务
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
// 已经是 SLAVE ,或者处于 MONITOR 模式,返回
if (c->flags & REDIS_SLAVE) return;
// 如果这是一个从服务器,但与主服务器的连接仍未就绪,那么拒绝 SYNC
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
// 在客户端仍有输出数据等待输出,不能 SYNC
if (listLength(c->reply) != 0 || c->bufpos != 0) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
redisLog(REDIS_NOTICE,"Slave asks for synchronization");
/*
* 如果这是一个 PSYNC 命令,那么尝试 partial resynchronization 。
* 如果失败,那么使用 full resynchronization ,
* 在这种情况下, masterTryPartialResynchronization() 返回以下内容:
* +FULLRESYNC <runid> <offset>
* 这样的话,之后如果主服务器断开,那么从服务器就可以尝试 PSYNC 了。
*/
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 尝试进行 PSYNC
if (masterTryPartialResynchronization(c) == REDIS_OK) {
// 可执行 PSYNC
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
// 不可执行 PSYNC
char *master_runid = c->argv[1]->ptr;
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
// 旧版实现,设置标识,避免接收 REPLCONF ACK
c->flags |= REDIS_PRE_PSYNC;
}
// 以下是完整重同步的情况。。。
// 执行 full resynchronization ,增加计数
server.stat_sync_full++;
// 检查是否有 BGSAVE 在执行
if (server.rdb_child_pid != -1) {
redisClient *slave;
listNode *ln;
listIter li;
// 如果有至少一个 slave 在等待这个 BGSAVE 完成
// 那么说明正在进行的 BGSAVE 所产生的 RDB 也可以为其他 slave 所用
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
}
if (ln) {
// 幸运的情况,可以使用目前 BGSAVE 所生成的 RDB
copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
// 不好运的情况,必须等待下个 BGSAVE
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
}
} else {
// 没有 BGSAVE 在进行,开始一个新的 BGSAVE
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
// 设置状态
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
/* Flush the script cache for the new slave. */
// 因为新 slave 进入,刷新复制脚本缓存
replicationScriptCacheFlush();
}
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
// 添加到 slave 列表中
listAddNodeTail(server.slaves,c);
// 如果是第一个 slave ,那么初始化 backlog
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
redis 部分同步处理逻辑
判断能够通过部分数据同步实现的逻辑很简单,主要从两个维度进行判断:
- 判断slave发送过来的master_runid是否等于master的runid
- 判断master是否存在backlog缓存部分同步命令并且偏移量符合要求
如果不满足上述两个条件那么就需要进行全量同步,否则进行部分同步
// 尝试进行部分 resync ,成功返回 REDIS_OK ,失败返回 REDIS_ERR 。
int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
// 检查 master id 是否和 runid 一致,只有一致的情况下才有 PSYNC 的可能
if (strcasecmp(master_runid, server.runid)) {
// 从服务器提供的 run id 和服务器的 run id 不一致
if (master_runid[0] != '?') {
redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
// 从服务器提供的 run id 为 '?' ,表示强制 FULL RESYNC
} else {
redisLog(REDIS_NOTICE,"Full resync requested by slave.");
}
// 需要 full resync
goto need_full_resync;
}
// 取出 psync_offset 参数
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
REDIS_OK) goto need_full_resync;
// 如果没有 backlog
if (!server.repl_backlog ||
// 或者 psync_offset 小于 server.repl_backlog_off
// (想要恢复的那部分数据已经被覆盖)
psync_offset < server.repl_backlog_off ||
// psync offset 大于 backlog 所保存的数据的偏移量
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
// 执行 FULL RESYNC
redisLog(REDIS_NOTICE,
"Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
if (psync_offset > server.master_repl_offset) {
redisLog(REDIS_WARNING,
"Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
}
goto need_full_resync;
}
/*
* 程序运行到这里,说明可以执行 partial resync
*
* 1) Set client state to make it a slave.
* 将客户端状态设为 salve
*
* 2) Inform the client we can continue with +CONTINUE
* 向 slave 发送 +CONTINUE ,表示 partial resync 的请求被接受
*
* 3) Send the backlog data (from the offset to the end) to the slave.
* 发送 backlog 中,客户端所需要的数据
*/
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
listAddNodeTail(server.slaves,c);
// 向从服务器发送一个同步 +CONTINUE ,表示 PSYNC 可以执行
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
// 发送 backlog 中的内容(也即是从服务器缺失的那些内容)到从服务器
psync_len = addReplyReplicationBacklog(c,psync_offset);
// 刷新低延迟从服务器的数量
refreshGoodSlavesCount();
return REDIS_OK;
need_full_resync:
// 刷新 psync_offset
psync_offset = server.master_repl_offset;
// 刷新 psync_offset
if (server.repl_backlog == NULL) psync_offset++;
// 发送 +FULLRESYNC ,表示需要完整重同步
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,psync_offset);
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR;
}
redis 全量同步的rdb生成过程
众所周知rdb文件生成是内部fork新的线程去执行rdb生成过程的,通过rdbSaveBackground的函数可以看出来内部通过fork()去实现rdb文件的生成过程。
在fork的线程当中执行rdbSave实现rdb文件的生成过程。
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
// 如果 BGSAVE 已经在执行,那么出错
if (server.rdb_child_pid != -1) return REDIS_ERR;
// 记录 BGSAVE 执行前的数据库被修改次数
server.dirty_before_bgsave = server.dirty;
// 最近一次尝试执行 BGSAVE 的时间
server.lastbgsave_try = time(NULL);
// fork() 开始前的时间,记录 fork() 返回耗时用
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
// 关闭网络连接 fd
closeListeningSockets(0);
// 设置进程的标题,方便识别
redisSetProcTitle("redis-rdb-bgsave");
// 执行保存操作
retval = rdbSave(filename);
// 打印 copy-on-write 时使用的内存数
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
// 向父进程发送信号
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
/* Parent */
// 计算 fork() 执行的时间
server.stat_fork_time = ustime()-start;
// 如果 fork() 出错,那么报告错误
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
// 打印 BGSAVE 开始的日志
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
// 记录数据库开始 BGSAVE 的时间
server.rdb_save_time_start = time(NULL);
// 记录负责执行 BGSAVE 的子进程 ID
server.rdb_child_pid = childpid;
// 关闭自动 rehash
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
rdb文件的生成过程其实挺简单的,大概流程如下:
- 创建临时的rdb文件保存数据,fork的子线程拷贝了夫线程的所有db数据
- 写入各类前置数据类似版本号之类
- 遍历所有db写入内存数据
- 写入各类后者数据类似校验码等
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success
*
* 将数据库保存到磁盘上。
*
* 保存成功返回 REDIS_OK ,出错/失败返回 REDIS_ERR 。
*/
int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;
// 创建临时文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
// 初始化 I/O
rioInitWithFile(&rdb,fp);
// 设置校验和函数
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
// 写入 RDB 版本号
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {
// 指向数据库
redisDb *db = server.db+j;
// 指向数据库键空间
dict *d = db->dict;
// 跳过空数据库
if (dictSize(d) == 0) continue;
// 创建键空间迭代器
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/* Write the SELECT DB opcode
*
* 写入 DB 选择器
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(&rdb,j) == -1) goto werr;
/* Iterate this DB writing every entry
*
* 遍历数据库,并写入每个键值对的数据
*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
// 根据 keystr ,在栈中创建一个 key 对象
initStaticStringObject(key,keystr);
// 获取键的过期时间
expire = getExpire(db,&key);
// 保存键值对数据
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/* EOF opcode
*
* 写入 EOF 代码
*/
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case.
*
* CRC64 校验和。
*
* 如果校验和功能已关闭,那么 rdb.cksum 将为 0 ,
* 在这种情况下, RDB 载入时会跳过校验和检查。
*/
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb,&cksum,8);
/* Make sure data will not remain on the OS's output buffers */
// 冲洗缓存,确保数据已写入磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok.
*
* 使用 RENAME ,原子性地对临时文件进行改名,覆盖原来的 RDB 文件。
*/
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
// 写入完成,打印日志
redisLog(REDIS_NOTICE,"DB saved on disk");
// 清零数据库脏状态
server.dirty = 0;
// 记录最后一次完成 SAVE 的时间
server.lastsave = time(NULL);
// 记录最后一次执行 SAVE 的状态
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
// 关闭文件
fclose(fp);
// 删除文件
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
redis 检查rdb是否完成
通过检查server.rdb_child_pid或者server.aof_child_pid确认是否执行rdb文件生成或者aof文件写入。
- rdb生成完成执行backgroundSaveDoneHandler函数
- aof生成完成执行backgroundRewriteDoneHandler函数
暂时我们只关心rdb文件生成也就是跟进backgroundSaveDoneHandler过程
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
// 接收子进程发来的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// BGSAVE 执行完毕
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
// BGREWRITEAOF 执行完毕
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
} else {
// 既然没有 BGSAVE 或者 BGREWRITEAOF 在执行,那么检查是否需要执行它们
// 遍历所有保存条件,看是否需要执行 BGSAVE 命令
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
// 检查是否有某个保存条件已经满足了
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// 执行 BGSAVE
rdbSaveBackground(server.rdb_filename);
break;
}
}
/* Trigger an AOF rewrite if needed */
// 出发 BGREWRITEAOF
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
server.aof_current_size > server.aof_rewrite_min_size)
{
// 上一次完成 AOF 写入之后,AOF 文件的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF 文件当前的体积相对于 base 的体积的百分比
long long growth = (server.aof_current_size*100/base) - 100;
// 如果增长体积的百分比超过了 growth ,那么执行 BGREWRITEAOF
if (growth >= server.aof_rewrite_perc) {
redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
// 执行 BGREWRITEAOF
rewriteAppendOnlyFileBackground();
}
}
}
}
bgsave完成后我们执行updateSlavesWaitingBgsave来实现rdb数据的同步。
/*
* 处理 BGSAVE 完成时发送的信号
*/
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
// BGSAVE 成功
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
// BGSAVE 出错
} else if (!bysignal && exitcode != 0) {
redisLog(REDIS_WARNING, "Background saving error");
server.lastbgsave_status = REDIS_ERR;
// BGSAVE 被中断
} else {
redisLog(REDIS_WARNING,
"Background saving terminated by signal %d", bysignal);
// 移除临时文件
rdbRemoveTempFile(server.rdb_child_pid);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = REDIS_ERR;
}
// 更新服务器状态
server.rdb_child_pid = -1;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
// 处理正在等待 BGSAVE 完成的那些 slave
updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}
在updateSlavesWaitingBgsave过程中我们把和slave连接的socket注册写事件到eventLoop当中且回调函数为sendBulkToSlave,通过该回调函数实现rdb文件的传输。
可以看出来整个同步过程中我们会同步master到所有的slave节点,注意是所有的slave节点。
/*
* 在每次 BGSAVE 执行完毕之后使用
* bgsaveerr 可能是 REDIS_OK 或者 REDIS_ERR ,显示 BGSAVE 的执行结果
* 这个函数是在 BGSAVE 完成之后的异步回调函数,
* 它指导该怎么执行和 slave 相关的 RDB 下一步工作。
*/
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
// 遍历所有 slave
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
// 之前的 RDB 文件不能被 slave 使用,
// 开始新的 BGSAVE
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
// 执行到这里,说明有 slave 在等待 BGSAVE 完成
struct redis_stat buf;
// 但是 BGSAVE 执行错误
if (bgsaveerr != REDIS_OK) {
// 释放 slave
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
// 打开 RDB 文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
// 设置偏移量,各种值
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
// 更新状态
slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
// 清空之前的写事件处理器
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// 将 sendBulkToSlave 安装为 slave 的写事件处理器
// 它用于将 RDB 文件发送给 slave
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
// 需要执行新的 BGSAVE
if (startbgsave) {
// 开始行的 BGSAVE ,并清空脚本缓存
replicationScriptCacheFlush();
if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
sendBulkToSlave内部主要实现两个事情,都是和数据传输有关:
- master传递rdb文件内容给slave
- master将slave的socket的写事件注册到eventLoop当中且回调函数为sendReplyToClient,在sendReplyToClient内部把缓存的redis操作命令同步到slave。
// master 将 RDB 文件发送给 slave 的写事件处理器
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen;
/* Before sending the RDB file, we send the preamble as configured by the
* replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
strerror(errno));
freeClient(slave);
return;
}
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}
/* If the preamble was already transfered, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
// 读取 RDB 数据
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) {
redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
// 写入数据到 slave
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
strerror(errno));
freeClient(slave);
}
return;
}
// 如果写入成功,那么更新写入字节数到 repldboff ,等待下次继续写入
slave->repldboff += nwritten;
// 如果写入已经完成
if (slave->repldboff == slave->repldbsize) {
// 关闭 RDB 文件描述符
close(slave->repldbfd);
slave->repldbfd = -1;
// 删除之前绑定的写事件处理器
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
// 将状态更新为 REDIS_REPL_ONLINE
slave->replstate = REDIS_REPL_ONLINE;
// 更新响应时间
slave->repl_ack_time = server.unixtime;
// 创建向从服务器发送命令的写事件处理器
// 将保存并发送 RDB 期间的回复全部发送给从服务器
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
// 刷新低延迟 slave 数量
refreshGoodSlavesCount();
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
}
}
网友评论