redis的复制实现包括了以下几个过程
- 主从关系建立
- 主从网络连接建立
- 发送PING命令
- 认证权限
- 发送端口号
- 发送IP地址
- 发送能力capability
- 发送PSYNC命令
- 发送输出缓冲区数据
- 命令传播
还有部分重实现
- 心跳机制
- 复制积压缓冲区backlog
主从关系建立
执行slaveof
不管使用redis的哪一种复制方式,都是在slave端进行操作,在执行slaveof
命令时调用了以下函数
void replicaofCommand(client *c) {
// 如果当前处于集群模式,则不能进行复制操作
if (server.cluster_enabled) {
addReplyError(c,"REPLICAOF not allowed in cluster mode.");
return;
}
// SLAVEOF NO ONE命令使得这个从节点关闭复制功能,并从从节点转变回主节点,原来同步所得的数据集不会被丢弃。
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
// 如果保存了主节点IP
if (server.masterhost) {
// 取消复制操作,设置服务器为主服务器
replicationUnsetMaster();
// 获取client的每种信息,并以sds形式返回,并打印到日志中
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
}
} else {
long port;
if (c->flags & CLIENT_SLAVE)
{
addReplyError(c, "Command is not valid when client is a replica.");
return;
}
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
return;
// 如果已存在从属于masterhost主节点且命令参数指定的主节点和masterhost相等,端口也相等,直接返回
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}
// 第一次执行设置端口和ip,或者是重新设置端口和IP
// 设置服务器复制操作的主节点IP和端口
replicationSetMaster(c->argv[1]->ptr, port);
// 获取client的每种信息,并以sds形式返回,并打印到日志中
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
server.masterhost, server.masterport, client);
sdsfree(client);
}
// 回复OK
addReply(c,shared.ok);
}
当客户端执行slaveof
命令后,该命令会被构造成redis协议格式,发送到slave节点服务器上,节点服务器会调用replicaofCommand
函数执行这个命令,函数主要执行了几个操作
- 验证当前是否处于集群模式,集群模式下则不执行复制操作
- 执行的是否是
slaveof no one
,是的话则将masterhost置为自己,即断开主从关系 - 调用
replicationSetMaster
设置master机器的IP和端口
replicationSetMaster函数代码:
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
cancelReplicationHandshake();
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) replicationCacheMasterUsingMyself();
server.repl_state = REPL_STATE_CONNECT;
}
可以看到在replicationSetMaster
函数中主要是重置了masterhost,masterport,并清除了之前的连接
因为当前slave节点之前可能是属于另一个master节点的,所以要清理关于之前master节点的所有缓存,关闭旧的连接等,最后设置了服务器的状态
server.repl_state = REPL_STATE_CONNECT // 复制必须连接主节点
主从网络连接建立
slaveof是一个异步的命令,运行后设置好主节点的信息后就会立即返回。之后的复制流程是异步执行的,而异步触发的时机,是来自redis的定时任务。
在redis服务器初始化时,会启动serverCron()
函数,作为时间事件处理函数,在该函数中调用了replicationCron()
函数并设置定时为1秒
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
run_with_period(1000) replicationCron();
在replicationCron()
函数中存在判断,根据不同的复制状态进行相应的操作
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake();
}
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
listIter li;
listNode *ln;
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
/* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
server.cluster_enabled &&
server.cluster->mf_end &&
clientsArePaused();
if (!manual_failover_in_progress) {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}
}
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
if (is_presync) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry about socket errors, it's just a ping. */
}
}
}
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}
/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
/* When we free the backlog, we always use a new
* replication ID and clear the ID2. This is needed
* because when there is no backlog, the master_repl_offset
* is not updated, but we would still retain our replication
* ID, leading to the following problem:
*
* 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connect to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */
changeReplicationId();
clearReplicationId2();
freeReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog freed after %d seconds "
"without connected replicas.",
(int) server.repl_backlog_time_limit);
}
}
/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all. */
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}
/* Start a BGSAVE good for replication if we have slaves in
* WAIT_BGSAVE_START state.
*
* In case of diskless replication, we make sure to wait the specified
* number of seconds (according to configuration) so that other slaves
* have the time to arrive before we start streaming. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
}
}
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
在建立主从连接时,已经把状态设置为了REPL_STATE_CONNECT
,所以在这个函数中会调用connectWithMaster()
函数,去连接master服务器
int connectWithMaster(void) {
int fd;
// 非阻塞连接主节点
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
// 监听主节点fd的可读和可写事件的发生,并设置其处理程序为syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}
// 最近一次读到RDB文件内容的时间
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
在connectWithMaster()
函数中,使用非阻塞的方式连接master节点,连接成功后在套接字fd
上绑定AE_READABLE
和AE_WRITABLE
两个事件,监听发来的读写请求,并设置处理函数syncWithMaste()
。最后将复制状态置为连接中
server.repl_state = REPL_STATE_CONNECTING
到这里,从节点和主节点的网络连接就建立完成了
发送PING命令
在建立主从网络连接时,在fd上绑定了读写事件,因为是从节点向主节点发起连接请求,所以会触发fd上的一个写事件,从而调用syncWithMaster()
函数进行处理,这个函数的处理过程太长,这里就不全贴出了,只贴出与本次事件相关的处理逻辑
/* Send a PING to check the master is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REPL_STATE_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
if (err) goto write_error;
return;
}
这个函数中的操作很显而易见了
- 清除套接字上
AE_WRITEABLE
标记,即只保留对读事件的监听,因为这里已经发送了PING,所以接下来就只需要等待主节点返回PONG - 更改复制状态为
REPL_STATE_RECEIVE_PONG
- 发送一个同步的PING命令
对于PING命令的处理函数位于server.c文件中
/* The PING command. It works in a different way if the client is in
* in Pub/Sub mode. */
void pingCommand(client *c) {
/* The command takes zero or one arguments. */
if (c->argc > 2) {
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return;
}
if (c->flags & CLIENT_PUBSUB) {
addReply(c,shared.mbulkhdr[2]);
addReplyBulkCBuffer(c,"pong",4);
if (c->argc == 1)
addReplyBulkCBuffer(c,"",0);
else
addReplyBulk(c,c->argv[1]);
} else {
if (c->argc == 1)
addReply(c,shared.pong);
else
addReplyBulk(c,c->argv[1]);
}
}
上面函数中验证只要不是处于发布订阅模式,参数个数为1,就可以返回一个pong回复
在从节点套接字就会发生一个读事件,在syncWithMaster()
中进行处理
/* Receive the PONG command. */
// 如果复制的状态为REPL_STATE_RECEIVE_PONG,等待接受PONG命令
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
// 从主节点读一个PONG命令sendSynchronousCommand
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
// 只接受两种有效的回复。一种是 "+PONG",一种是认证错误"-NOAUTH"。
// 旧版本的返回有"-ERR operation not permitted"
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH;
}
从套接字上读到返回的PONG信息,并将复制状态设置为REPL_STATE_SEND_AUTH
,等待进行权限验证
权限认证
还是在syncWithMaster()
函数中,在上面的if逻辑中设置完状态后,自然走到下一个if语句块进行权限验证操作
/* AUTH with the master if required. */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
// 如果服务器设置了认证密码
if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
// 设置状态为等待接受认证回复
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
// 如果没有设置认证密码,直接设置复制状态为发送端口号给主节点
server.repl_state = REPL_STATE_SEND_PORT;
}
}
首先会检查是否有设置认证密码server.masterauth
如果有,则将认证密码一起发送给主节点,如果没有则不需要认证了,直接将状态改为发送端口号给主节点REPL_STATE_SEND_PORT
进行下一项操作
主节点上收到AUTH命令,调用authCommand()
函数处理
/* Receive AUTH reply. */
// 接受AUTH认证的回复
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
// 从主节点读回复
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
// 回复错误,认证失败
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
// 设置复制状态为发送端口号给主节点
server.repl_state = REPL_STATE_SEND_PORT;
}
主节点会比较发送来的server.masterauth
和主节点上保存的server.requirepass
是否一致,如果一致则返回OK
从节点接收到响应后,调用syncWithMaster()
处理
/* Receive AUTH reply. */
// 接受AUTH认证的回复
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
// 从主节点读回复
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
// 回复错误,认证失败
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
// 设置复制状态为发送端口号给主节点
server.repl_state = REPL_STATE_SEND_PORT;
}
在接收到响应后如果认证成功了,就将状态改为REPL_STATE_SEND_PORT
,跟没有设置验证时一样
发送端口号
继续执行下一个if语句块
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
// 如果复制状态是,发送从节点端口号给主节点,主节点的INFO命令就能够列出从节点正在监听的端口号
if (server.repl_state == REPL_STATE_SEND_PORT) {
// 获取端口号
sds port = sdsfromlonglong(server.slave_announce_port ?
server.slave_announce_port : server.port);
// 将端口号写给主节点
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","listening-port",port, NULL);
sdsfree(port);
if (err) goto write_error;
sdsfree(err);
// 设置复制状态为接受端口号
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
在主节点上调用replconfCommand()
函数进行处理,代码太长就不贴了,总之主节点会将slave的端口号保存到client数据结构中c->slave_listening_port = port
,并且回复OK
从节点接收到OK以后,调用syncWithMaster()
,验证是否主节点接收成功,接收成功则继续改变状态
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}
确认主节点接收端口号成功后,将状态改为发送IP
发送IP
这里的操作跟之前类似,不加赘述,只贴出相关代码
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}
/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA;
}
状态REPL_STATE_SEND_CAPA
表示发送从节点的能力,即能否解析RDB文件的EOF流格式
发送CAPA
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
这里只要注意主节点是将收到的REPLCONF capa eof命令按位与到主节点上client的c->slave_capa
中
发送PSYNC
从节点发送PSYNC命令给主节点,即向主节点要求同步数据集。同步方式分为
- 全量同步:第一次执行复制时
- 部分同步:复制过程由于网络等原因造成数据丢失的场景下
在建立连接之后执行的是第一次同步,所以是全量同步的场景,同样是在syncWithMaster()
函数中
// 复制状态为发送PSYNC命令。尝试进行部分重同步。
// 如果没有缓冲主节点的结构,slaveTryPartialResynchronization()函数将会至少尝试使用PSYNC去进行一个全同步,这样就能得到主节点的运行runid和全局复制偏移量。并且在下次重连接时可以尝试进行部分重同步。
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
// 向主节点发送一个部分重同步命令PSYNC,参数0表示不读主节点的回复,只获取主节点的运行runid和全局复制偏移量
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
// 发送PSYNC出错
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
// 设置复制状态为等待接受一个PSYNC回复
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
函数中通过调用slaveTryPartialResynchronization(fd,0)
函数向master发送同步请求,函数太长,这里仅展示相关逻辑,调用参数上可以看到第二参数是0,
/* Writing half */
if (!read_reply) {
// 将repl_master_initial_offset设置为-1表示主节点的run_id和全局复制偏移量是无效的。
// 如果能使用PSYNC命令执行一个全量同步,会正确设置全复制偏移量,以便这个信息被正确传播主节点的所有从节点中
server.master_initial_offset = -1;
// 主节点的缓存不为空,可以尝试进行部分重同步。PSYNC <master_run_id> <repl_offset>
if (server.cached_master) {
// 保存缓存runid
psync_replid = server.cached_master->replid;
// 获取已经复制的偏移量
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
// 主节点的缓存为空,发送PSYNC ? -1。请求全量同步
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
// 返回等待回复的标识PSYNC_WAIT_REPLY,调用者会将read_reply设置为1,然后再次调用该函数,执行下面的读部分
return PSYNC_WAIT_REPLY;
}
当read_reply传0的时候,函数执行写操作,即向master写入一个PSYNC命令,由于本次是第一次向主节点发送同步请求,所以从节点上缓存的主节点状态server.cached_master
为空,所以会发送一个PSYNC ? -1
,表示进行全量同步
主节点调用syncCommand()
方法处理
void syncCommand(client *c) {
..........//为了简洁,删除一些判断条件的代码
// 尝试执行一个部分同步PSYNC的命令,则masterTryPartialResynchronization()会回复一个 "+FULLRESYNC <runid> <offset>",如果失败则执行全量同步
// 所以,从节点会如果和主节点连接断开,从节点会知道runid和offset,随后会尝试执行PSYNC
// 如果是执行PSYNC命令
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 主节点尝试执行部分重同步,执行成功返回C_OK
if (masterTryPartialResynchronization(c) == C_OK) {
// 可以执行PSYNC命令,则将接受PSYNC命令的个数加1
server.stat_sync_partial_ok++;
// 不需要执行后面的全量同步,直接返回
return; /* No full resync needed, return. */
// 不能执行PSYNC部分重同步,需要进行全量同步
} else {
char *master_runid = c->argv[1]->ptr;
// 从节点以强制全量同步为目的,所以不能执行部分重同步,因此增加PSYNC命令失败的次数
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
// 执行SYNC命令
} else {
// 设置标识,执行SYNC命令,不接受REPLCONF ACK
c->flags |= CLIENT_PRE_PSYNC;
}
// 全量重同步次数加1
server.stat_sync_full++;
// 设置client状态为:从服务器节点等待BGSAVE节点的开始
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
// 执行SYNC命令后是否关闭TCP_NODELAY
if (server.repl_disable_tcp_nodelay)
// 是的话,则启用nagle算法
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
// 保存主服务器传来的RDB文件的fd,设置为-1
c->repldbfd = -1;
// 设置client状态为从节点,标识client是一个从服务器
c->flags |= CLIENT_SLAVE;
// 添加到服务器从节点链表中
listAddNodeTail(server.slaves,c);
/* CASE 1: BGSAVE is in progress, with disk target. */
// 情况1. 正在执行 BGSAVE ,且是同步到磁盘上
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
client *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
slave = ln->value;
// 如果有从节点已经创建子进程执行写RDB操作,等待完成,那么退出循环
// 从节点的状态为 SLAVE_STATE_WAIT_BGSAVE_END 在情况三中被设置
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
// 对于这个从节点,我们检查它是否具有触发当前BGSAVE操作的能力
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
// 将slave的输出缓冲区所有内容拷贝给c的所有输出缓冲区中
copyClientOutputBuffer(c,slave);
// 设置全量重同步从节点的状态,设置部分重同步的偏移量
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
/* CASE 2: BGSAVE is in progress, with socket target. */
// 情况2. 正在执行BGSAVE,且是无盘同步,直接写到socket中
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
// 虽然有子进程在执行写RDB,但是它直接写到socket中,所以等待下次执行BGSAVE
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
// 情况3:没有执行BGSAVE的进程
} else {
// 服务器支持无盘同步
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
// 无盘同步复制的子进程被创建在replicationCron()中,因为想等待更多的从节点可以到来而延迟
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
// 服务器不支持无盘复制
} else {
// 如果没有正在执行BGSAVE,且没有进行写AOF文件,则开始为复制执行BGSAVE,并且是将RDB文件写到磁盘上
if (server.aof_child_pid == -1) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. BGSAVE for replication delayed");
}
}
}
// 只有一个从节点,且backlog为空,则创建一个新的backlog
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
函数中根据PSYNC命令,执行了部分复制的函数,但是由于这是第一次复制,所以执行是失败的,开始执行全量复制。将从节点的复制状态设置为SLAVE_STATE_WAIT_BGSAVE_START
后续处理分为三个case
- master服务器正在执行BGSAVE命令,且将RDB文件写到磁盘上
这种情况下如果有已经设置过的全局重同步偏移量的从节点,可以公用输出缓冲区的数据 - master服务器正在执行BGSAVE命令,且将RDB文件写到网络套接字上,无盘同步
这种情况下由于RDB文件直接写到socket中,所以只能等待下一次BGSAVE - master服务器没有在执行BGSAVE
如果也没有在进行AOF持久化操作,那么会为了复制操作开始执行BGSAVE,并将RDB写到磁盘
这里与主动复制相关的是第三个case,调用了startBgsaveForReplication()
函数
// 开始为复制执行BGSAVE,根据配置选择磁盘或套接字作为RDB发送的目标,在开始之前确保冲洗脚本缓存
// mincapa参数是SLAVE_CAPA_*按位与的结果
int startBgsaveForReplication(int mincapa) {
int retval;
// 是否直接写到socket
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
if (socket_target)
// 直接写到socket中
// fork一个子进程将rdb写到 状态为等待BGSAVE开始 的从节点的socket中
retval = rdbSaveToSlavesSockets();
else
// 否则后台进行RDB持久化BGSAVE操作,保存到磁盘上
retval = rdbSaveBackground(server.rdb_filename);
......
// 如果是直接写到socket中,rdbSaveToSlavesSockets()已经会设置从节点为全量复制
// 否则直接写到磁盘上,执行以下代码
if (!socket_target) {
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = ln->value;
// 设置等待全量同步的从节点的状态
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
// 设置要执行全量重同步从节点的状态
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}
}
函数中向将每个从节点的状态更改为SLAVE_STATE_WAIT_BGSAVE_START
,等待BGSAVE的执行,并且发送了FULLRESYNC命令和master节点的runid
,全局复制偏移量server.master_repl_offset
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
// 设置全量重同步的偏移量
slave->psync_initial_offset = offset;
// 设置从节点复制状态,开始累计差异数据
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
// 将slaveseldb设置为-1,是为了强制发送一个select命令在复制流中
server.slaveseldb = -1;
// 如果从节点的状态是CLIENT_PRE_PSYNC,则表示是Redis是2.8之前的版本,则不将这些信息发送给从节点。
// 因为在2.8之前只支持SYNC的全量复制同步,而在之后的版本提供了部分的重同步
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,offset);
// 否则会将全量复制的信息写给从节点
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
从节点上再次回调syncWithMaster()
函数,执行
psync_result = slaveTryPartialResynchronization(fd,1);
这一次第二个参数是1,因此会执行函数的读部分
/* Reading half */
// 从主节点读一个命令保存在reply中
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
// 主节点为了保持连接的状态,可能会在接收到PSYNC命令后发送一个空行
sdsfree(reply);
// 所以就返回PSYNC_WAIT_REPLY,调用者会将read_reply设置为1,然后再次调用该函数。
return PSYNC_WAIT_REPLY;
}
// 如果读到了一个命令,删除fd的可读事件
aeDeleteFileEvent(server.el,fd,AE_READABLE);
// 接受到的是"+FULLRESYNC",表示进行一次全量同步
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid = NULL, *offset = NULL;
// 解析回复中的内容,将runid和复制偏移量提取出来
runid = strchr(reply,' ');
if (runid) {
runid++; //定位到runid的地址
offset = strchr(runid,' ');
if (offset) offset++; //定位offset
}
// 如果runid和offset任意为空,那么发生不期望错误
if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,"Master replied with wrong +FULLRESYNC syntax.");
// 将主节点的运行ID重置为0
memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
// runid和offset获取成功
} else {
// 设置服务器保存的主节点的运行ID
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
// 主节点的偏移量
server.repl_master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",server.repl_master_runid, server.repl_master_initial_offset);
}
// 执行全量同步,所以缓存的主节点结构没用了,将其清空
replicationDiscardCachedMaster();
sdsfree(reply);
// 返回执行的状态
return PSYNC_FULLRESYNC;
}
// 接受到的是"+CONTINUE",表示进行一次部分重同步
if (!strncmp(reply,"+CONTINUE",9)) {
serverLog(LL_NOTICE,"Successful partial resynchronization with master.");
sdsfree(reply);
// 因为执行部分重同步,因此要使用缓存的主节点结构,所以将其设置为当前的主节点,被同步的主节点
replicationResurrectCachedMaster(fd);
// 返回执行的状态
return PSYNC_CONTINUE;
}
// 接收到了错误,两种情况。
// 1. 主节点不支持PSYNC命令,Redis版本低于2.8
// 2. 从主节点读取了一个不期望的回复
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,"Master does not support PSYNC or is in error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
// 发送不支持PSYNC命令的状态
return PSYNC_NOT_SUPPORTED;
函数中处理了master节点发来的三种响应
- “+FULLRESYNC”:代表要进行一次全量复制。
- “+CONTINUE”:代表要进行一次部分重同步。
- “-ERR”:发生了错误。有两种可能:Redis版本过低不支持PSYNC命令和从节点读到一个错误回复。
这里看第一种处理,开始继续进行全量同步
// 执行到这里,psync_result == PSYNC_FULLRESYNC或PSYNC_NOT_SUPPORTED
// 准备一个合适临时文件用来写入和保存主节点传来的RDB文件数据
while(maxtries--) {
// 设置文件的名字
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
// 以读写,可执行权限打开临时文件
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
// 打开成功,跳出循环
if (dfd != -1) break;
sleep(1);
}
/* Setup the non blocking download of the bulk file. */
// 监听一个fd的读事件,并设置该事件的处理程序为readSyncBulkPayload
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
// 复制状态为正从主节点接受RDB文件
server.repl_state = REPL_STATE_TRANSFER;
// 初始化RDB文件的大小
server.repl_transfer_size = -1;
// 已读的大小
server.repl_transfer_read = 0;
// 最近一个执行fsync的偏移量为0
server.repl_transfer_last_fsync_off = 0;
// 传输RDB文件的临时fd
server.repl_transfer_fd = dfd;
// 最近一次读到RDB文件内容的时间
server.repl_transfer_lastio = server.unixtime;
// 保存RDB文件的临时文件名
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
函数中打开了一个临时文件用于保存主节点发来的RDB文件数据,监听fd上的读事件,使用readSyncBulkPayload()
函数处理
这个函数处理比较复杂,这里就不贴代码了,大概操作是将主节点发来的EOF流数据读到一个缓冲区,然后将缓冲区数据写到打开的临时文件中,接着将文件载入从节点的数据库中,同步到磁盘上。
最后设置复制的状态为REPL_STATE_TRANSFER
,并初始化复制的相关信息,RDB文件大小,偏移量等
主节点在回复完FULLRESYNC之后就没有再做操作,RDB文件的发送也是依赖时间事件触发的,调用过程如下
serverCron()->backgroundSaveDoneHandler()->backgroundSaveDoneHandlerDisk()->updateSlavesWaitingBgsave()
/* This function is called at the end of every background saving,
* or when the replication RDB transfer strategy is modified from
* disk to socket or the other way around.
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization, and
* to schedule a new BGSAVE if there are slaves that attached while a
* BGSAVE was in progress, but it was not a good one for replication (no
* other slave was accumulating differences).
*
* The argument bgsaveerr is C_OK if the background saving succeeded
* otherwise C_ERR is passed to the function.
* The 'type' argument is the type of the child that terminated
* (if it had a disk or socket target). */
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
int mincapa = -1;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
/* If this was an RDB on disk save, we have to prepare to send
* the RDB from disk to the slave socket. Otherwise if this was
* already an RDB -> Slaves socket transfer, used in the case of
* diskless replication, our work is trivial, we can just put
* the slave online. */
if (type == RDB_CHILD_TYPE_SOCKET) {
serverLog(LL_NOTICE,
"Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));
/* Note: we wait for a REPLCONF ACK message from slave in
* order to really put it online (install the write handler
* so that the accumulated data can be transferred). However
* we change the replication state ASAP, since our slave
* is technically online now. */
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
} else {
if (bgsaveerr != C_OK) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
}
if (startbgsave) startBgsaveForReplication(mincapa);
}
函数中只读的打开临时RDB文件,设置从节点client的复制状态为SLAVE_STATE_SEND_BULK
创建写事件,并设置sendBulkToSlave()
为事件处理程序
当主节点周期执行时,会先清除之前监听的写事件,然后立即绑定新的写事件,触发处理函数将RDB文件写到fd上,从节点调用readSyncBulkPayload()
函数来处理接收到的RDB文件
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
client *slave = privdata;
UNUSED(el);
UNUSED(mask);
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;
/* Before sending the RDB file, we send the preamble as configured by the
* replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
strerror(errno));
freeClient(slave);
return;
}
server.stat_net_output_bytes += nwritten;
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}
/* If the preamble was already transferred, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
strerror(errno));
freeClient(slave);
}
return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
putSlaveOnline(slave);
}
}
这个函数将RDB文件的大小写给从节点,从RDB文件的repldbfd
中读出文件数据,写到fd上,写入完成后,清除可写事件标记,等待下一次发送缓冲区数据时再监听触发,并调用putSlaveOnline()
函数将从节点的复制状态置为SLAVE_STATE_ONLINE
,至此RDB文件发送完成,准备发送缓存更新
发送输出缓冲区数据
在发送完RDB文件之后,调用putSlaveOnline()
,会创建一个新事件,监听写事件发生,设置sendReplyToClient()
函数为处理程序,并且会将从节点的client对象当作私有数据传入函数,当作发送缓冲区对象,最后调用writeToClient()
函数将client中的缓冲区数据发送到从节点上,保证主从服务器的数据库状态一致
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed. */
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
clientReplyBlock *o;
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = o->used;
if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a slave (otherwise, on high-speed traffic, the replication
* buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
serverLog(LL_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return C_ERR;
}
}
if (totwritten > 0) {
/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}
}
return C_OK;
}
命令传播
在将RDB文件发送到从节点后,就开始正式的同步复制了,所有发送到主节点上执行的命令都会发送到从节点上执行。调用的函数是replicationFeedSlaves()
// 将参数列表中的参数发送给从服务器
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
// 如果没有backlog且没有从节点服务器,直接返回
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
// 如果当前从节点使用的数据库不是目标的数据库,则要生成一个select命令
if (server.slaveseldb != dictid) {
robj *selectcmd;
// 0 <= id < 10 ,可以使用共享的select命令对象
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
// 否则自行按照协议格式构建select命令对象
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
// 将select 命令添加到backlog中
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
// 发送给从服务器
listRewind(slaves,&li);
// 遍历所有的从服务器节点
while((ln = listNext(&li))) {
client *slave = ln->value;
// 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// 添加select命令到当前从节点的回复中
addReply(slave,selectcmd);
}
// 释放临时对象
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
// 设置当前从节点使用的数据库ID
server.slaveseldb = dictid;
// 将命令写到backlog中
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
// 将参数个数构建成协议标准的字符串
// *<argc>\r\n
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
// 添加到backlog中
feedReplicationBacklog(aux,len+3);
// 遍历所有的参数
for (j = 0; j < argc; j++) {
// 返回参数对象的长度
long objlen = stringObjectLen(argv[j]);
// 构建成协议标准的字符串,并添加到backlog中
// $<len>\r\n<argv>\r\n
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
// 添加$<len>\r\n
feedReplicationBacklog(aux,len+3);
// 添加参数对象<argv>
feedReplicationBacklogWithObject(argv[j]);
// 添加\r\n
feedReplicationBacklog(aux+len+1,2);
}
}
// 将命令写到每一个从节点中
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = ln->value;
// 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// 将命令写给正在等待初次SYNC的从节点(所以这些命令在输出缓冲区中排队,直到初始SYNC完成),或已经与主节点同步
/* Add the multi bulk length. */
// 添加回复的长度
addReplyMultiBulkLen(slave,argc);
// 将所有的参数列表添加到从节点的输出缓冲区
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
注意:
- 这个函数会将执行的命令以协议的传输格式写到从节点的缓冲区中,所以在复制时候需要将从节点的缓冲区也写到从节点服务器上。
- 在给从节点client写命令时,会强制写入select命令指定数据库,保证数据写到正确的数据库中。
- 命令不止写入了缓冲区,同时还会写入主节点服务器的复制积压缓冲区
server.repl_backlog
中,这是为了实现断点续传使用的
部分重同步实现
上述讨论的是全量同步的处理流程,但是在实际使用中可能会出现网络问题,导致连接中断,这时就需要部分重同步的机制来保证数据的可靠性
心跳机制
主从节点建立连接之后,会维护一个长连接,发送心跳命令,主节点默认每隔10秒发送PING命令确认从节点连接状态,配置项repl-ping-salve-period
// 首先,根据当前节点发送PING命令给从节点的频率发送PING命令
// 如果当前节点是某以节点的 主节点 ,那么发送PING给从节点
if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
// 创建PING命令对象
ping_argv[0] = createStringObject("PING",4);
// 将PING发送给从服务器
replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
}
从节点在主线程中默认每隔1秒发送一次REPLCONF ACK <offset>命令,给主节点报告自己当前的复制偏移量
// 定期发送ack给主节点,旧版本的Redis除外
if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC))
// 发送一个REPLCONF ACK命令给主节点去报告关于当前处理的offset。
replicationSendAck();
在定时任务replicationCron()
中每一次都会检查从节点和主节点的交互是否超时,如果超时则断开连接,等到下一个周期再和主节点重新建立连接,进行复制
复制积压缓冲区(backlog)
复制积压缓冲区是一个默认1M大小的循环队列,在主节点命令传播时,命令也会写到这个缓冲队列当中,但是最大只有1M,如果断开连接的时间较长,积压的命令数据超过1M,则会将旧数据丢失掉,所以在从节点再次连上时,就无法执行部分重同步,必须执行全量同步
在连接时调用的是上面提到过的syncCommand()
函数,其中调用masterTryPartialResynchronization()
函数来尝试执行部分重同步
// 该函数从主节点接收到部分重新同步请求的角度处理PSYNC命令
// 成功返回C_OK,否则返回C_ERR
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr; //主节点的运行ID
char buf[128];
int buflen;
// 主节点的运行ID是否和从节点执行PSYNC的参数提供的运行ID相同。
// 如果运行ID发生了改变,则主节点是一个不同的实例,那么就不能进行继续执行原有的复制进程
if (strcasecmp(master_runid, server.runid)) {
/* Run id "?" is used by slaves that want to force a full resync. */
// 如果从节点的运行ID是"?",表示想要强制进行一个全量同步
if (master_runid[0] != '?') {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
} else {
serverLog(LL_NOTICE,"Full resync requested by slave %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
// 从参数对象中获取psync_offset
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;
// 如果psync_offset小于repl_backlog_off,说明backlog所备份的数据的已经太新了,有一些数据被覆盖,则需要进行全量复制
// 如果psync_offset大于(server.repl_backlog_off + server.repl_backlog_histlen),表示当前backlog的数据不够全,则需要进行全量复制
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}
// 执行到这里,则可以进行部分重同步
// 1. 设置client状态为从节点
// 2. 向从节点发送 +CONTINUE 表示接受 partial resync 被接受
// 3. 发送backlog的数据给从节点
// 设置client状态为从节点
c->flags |= CLIENT_SLAVE;
// 设置复制状态为在线,此时RDB文件传输完成,发送差异数据
c->replstate = SLAVE_STATE_ONLINE;
// 设置从节点收到ack的时间
c->repl_ack_time = server.unixtime;
// slave向master发送ack标志设置为0
c->repl_put_online_on_ack = 0;
// 将当前client加入到从节点链表中
listAddNodeTail(server.slaves,c);
// 向从节点发送 +CONTINUE
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}
// 将backlog的数据发送从节点
psync_len = addReplyReplicationBacklog(c,psync_offset);
serverLog(LL_NOTICE,
"Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset);
// 计算延迟值小于min-slaves-max-lag的从节点的个数
refreshGoodSlavesCount();
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
return C_ERR;
}
如果可以进行部分重同步,则主节点会返回"+CONTINUE\r\n"
给从节点,然后调用addReplyReplicationBacklog()
函数, 这个函数就是将backlog中的数据发送到从节点
注意:
在部分重同步过程中全量同步前面的连接过程也全都会再走一遍,只是在从节点发送PSYNC
时,主节点返回的不再是+FULLRESYNC
而是+CONTINUE
网友评论