how redis do replication

redis-replciation.jpg
- step 1:add callback for server cron job
//location:server.c
//fucntion:setting callback for timeout
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
- step 2:call replication for each 1000 seconds
//location:server.c
//function:serverCron,crontab for redis-server
run_with_period(1000) replicationCron();
- step 3:core replication cron job
//location:replication.c
//function: response for redis replication
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
};
}
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
// fd of master socket
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Send a PING to check the master is able to reply without errors. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
/* Receive the PONG command. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* AUTH with the master if required. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
/* Receive AUTH reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"listening-port",port, NULL);
/* Receive REPLCONF listening-port reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
/* Receive REPLCONF ip-address reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
/* Receive CAPA reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
slaveTryPartialResynchronization(fd,0) {
/* If we reached this point, we are able to perform a partial resync:
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL)
{
/* command of up,will call syncCommand of master node
/* SYNC and PSYNC command implemenation. */
// PSYNC command will call syncCommand function
void syncCommand(client *c) {
masterTryPartialResynchronization(c) {
// write master replid to slave
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
}
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
startBgsaveForReplication(c->slave_capa) {
// proto
//int startBgsaveForReplication(int mincapa)
int retval;
//socket_target init according slave_capa_eof
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rsiptr) {
if (socket_target)
// mark rio with end flag,and send to slave
rdbSaveToSlavesSockets(rsiptr){
if(fork()==0){
CHILD_INFO_TYPE_RDB
}
}
else
// 1. bgsave current rdb.dump with background
rdbSaveBackground(server.rdb_filename,rsiptr){
if((childpid=fork())==0){
// redis server main process will wait util save all database in disk for finish
rdbSave(rdb_filename);
// write CHILD_INFO_TYPE_RDB to parent process by pipe
sendChildInfo(CHILD_INFO_TYPE_RDB)
}else{
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
}
}
}
copyClientOutputBuffer(c,slave);
if(!socket_target) {
// 2.write replid and offset to slave
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset)
}
}
}
}
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
}
psync_result = slaveTryPartialResynchronization(fd,1);
/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
* already populated. sync state,waited*/
syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1);
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
}
/* Setup the non blocking download of the bulk file. */
aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL);
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
// current node as slave,open temp-rdb.dump file and get fd for this file
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
}
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
/* Read bulk data */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
// read rdb.dump from master
nread = read(fd,buf,readlen);
if (nread <= 0) {
cancelReplicationHandshake();
}
// write to current temp-rdb.dump file
nwritten = write(server.repl_transfer_fd,buf,nread))
rename(server.repl_transfer_tmpfile,server.rdb_filename) ;
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
//mark client as dirty,that will be remove
signalFlushedDb(-1);
// clean all data for current node
emptyDb(-1,server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,replicationEmptyDbCallback)
{
// proto long long emptyDb(int dbnum, int flags, void(callback)(void*))
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
int startdb = 0;
int enddb = server.dbnum-1;
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict);
//async instand for as backgroup job to flush data in database
if (async) {
// that will create new dict after async,old dict mark to clean
emptyDbAsync(&server.db[j]);
} else {
//blocking for flush all data in database
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
}
}
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
//init slave info
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
//load data from rdb_file
rdbLoad(server.rdb_filename,&rsi);
// create master client for slave,that will got master socket and other info
// incude replid of master,server.replid
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
// clean old replid,that included in server.replid2
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (aof_is_enabled) restartAOF();
}
return;
}
* On success the fuction returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {
errno = EINVAL;
return -1;
}
int startdb, enddb;
if (dbnum == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbnum;
}
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict);
if (async) {
emptyDbAsync(&server.db[j]);
} else {
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
}
}
if (server.cluster_enabled) {
if (async) {
slotToKeyFlushAsync();
} else {
slotToKeyFlush();
}
}
if (dbnum == -1) flushSlaveKeysWithExpireList();
return removed;
}
- step4 wait for child process
//
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
{
int statloc
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
// error output
} else if (pid == server.rdb_child_pid) {
//each do bgsave ,server will store child pid
backgroundSaveDoneHandler(exitcode,bysignal)
{
switch(server.rdb_child_type) {
case RDB_CHILD_TYPE_DISK:
backgroundSaveDoneHandlerDisk(exitcode,bysignal)
{
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_DISK){
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
//
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
if (type == RDB_CHILD_TYPE_SOCKET) {
// update slave info
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. *
}else{
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//sendBulkToSlave:send maser rdb.dump to every slave
aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave);
}
}
break;
case RDB_CHILD_TYPE_SOCKET:
backgroundSaveDoneHandlerSocket(exitcode,bysignal)
{
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
}
break;
}
}
//read child data from parent,and check data
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
//read child data from parent,and check data
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
}
网友评论