美文网首页
how work for redis replication

how work for redis replication

作者: perryn | 来源:发表于2018-09-14 02:07 被阅读54次

how redis do replication

redis-replciation.jpg
  • step 1:add callback for server cron job
  //location:server.c
  //fucntion:setting callback for timeout
  if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
  • step 2:call replication for each 1000 seconds
//location:server.c
//function:serverCron,crontab for redis-server
run_with_period(1000) replicationCron();
  • step 3:core replication cron job
//location:replication.c
//function: response for redis replication 
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {

   
     /* Check if we should connect to a MASTER */
    if (server.repl_state == REPL_STATE_CONNECT) {
        if (connectWithMaster() == C_OK) {
            serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
        }
    };
  
}

int connectWithMaster(void) {
    int fd;
    fd = anetTcpNonBlockBestEffortBindConnect(NULL,
        server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
    // fd of master socket 
    aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);

}

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {

    /* Send a PING to check the master is able to reply without errors. */
    sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
    
    /* Receive the PONG command. */
    sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

    /* AUTH with the master if required. */
    sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
          

    /* Receive AUTH reply. */
    sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
    
    /* Set the slave port, so that Master's INFO command can list the
     * slave listening port correctly. */
     sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "listening-port",port, NULL);
  

    /* Receive REPLCONF listening-port reply. */
    sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

    /* Set the slave ip, so that Master's INFO command can list the
     * slave IP address port correctly in case of port forwarding or NAT. */
     sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "ip-address",server.slave_announce_ip, NULL);

    /* Receive REPLCONF ip-address reply. */
    sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
      
    /* Inform the master of our (slave) capabilities.
     *
     * EOF: supports EOF-style RDB transfer for diskless replication.
     * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
     *
     * The master will ignore capabilities it does not understand. */
     sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "capa","eof","capa","psync2",NULL);
     
    
     
    /* Receive CAPA reply. */
    sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
      

    /* Try a partial resynchonization. If we don't have a cached master
     * slaveTryPartialResynchronization() will at least try to use PSYNC
     * to start a full resynchronization so that we get the master run id
     * and the global offset, to try a partial resync at the next
     * reconnection attempt. */
    slaveTryPartialResynchronization(fd,0) {
     /* If we reached this point, we are able to perform a partial resync:
     
          /* Issue the PSYNC command */
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL)
        {
         /* command of up,will call syncCommand of master node
          /* SYNC and PSYNC command implemenation. */
          
          // PSYNC command will call syncCommand function
          void syncCommand(client *c) {
            masterTryPartialResynchronization(c) {
                // write master replid to slave
                
                if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
                    buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
                 } else {
                    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
                }
                if (write(c->fd,buf,buflen) != buflen) {
                freeClientAsync(c);
            }
          
           * 1) Set client state to make it a slave.
           * 2) Inform the client we can continue with +CONTINUE
          * 3) Send the backlog data (from the offset to the end) to the slave. */
          c->flags |= CLIENT_SLAVE;
          c->replstate = SLAVE_STATE_ONLINE;
          c->repl_ack_time = server.unixtime;
          c->repl_put_online_on_ack = 0;
          listAddNodeTail(server.slaves,c);
          startBgsaveForReplication(c->slave_capa) {
               // proto
               //int startBgsaveForReplication(int mincapa) 
                int retval;
                
                //socket_target init according slave_capa_eof
                int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                
                if (rsiptr) {
                  if (socket_target)
                      // mark rio with end flag,and send to slave
                      rdbSaveToSlavesSockets(rsiptr){
                         if(fork()==0){
                              CHILD_INFO_TYPE_RDB
                         }
                      }
                  else
                      // 1. bgsave current rdb.dump with background
                      rdbSaveBackground(server.rdb_filename,rsiptr){
                         if((childpid=fork())==0){
                            // redis server main process will wait util save all database in disk for finish
                            rdbSave(rdb_filename);
                            // write CHILD_INFO_TYPE_RDB to parent process by pipe
                            sendChildInfo(CHILD_INFO_TYPE_RDB)
                         }else{
                            server.rdb_child_pid = childpid;
                            server.rdb_child_type = RDB_CHILD_TYPE_DISK;
                         }
                      }
                 }
                 copyClientOutputBuffer(c,slave);
                 if(!socket_target) {
                     // 2.write replid and offset to slave
                     replicationSetupSlaveForFullResync(c,slave->psync_initial_offset)
                 }
            }
          }
        }
        reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
         /* We are going to full resync, discard the cached master structure. */
        replicationDiscardCachedMaster();

    }

    psync_result = slaveTryPartialResynchronization(fd,1);
    

    /* PSYNC failed or is not supported: we want our slaves to resync with us
     * as well, if we have any sub-slaves. The master may transfer us an
     * entirely different data set and we have no way to incrementally feed
     * our slaves after that. */
    disconnectSlaves(); /* Force our slaves to resync with us as well. */
    freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.master_replid and master_initial_offset are
     * already populated.  sync state,waited*/
     syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1);

    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
    }
   

    /* Setup the non blocking download of the bulk file. */
    aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL);

    server.repl_state = REPL_STATE_TRANSFER;
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    
    // current node as slave,open temp-rdb.dump file and get fd for this file
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;
}

void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[4096];
    ssize_t nread, readlen, nwritten;

    /* If repl_transfer_size == -1 we still have to read the bulk length
     * from the master reply. */
      syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
   

    /* Read bulk data */
    if (usemark) {
        readlen = sizeof(buf);
    } else {
        left = server.repl_transfer_size - server.repl_transfer_read;
        readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
    }

   // read  rdb.dump from master
    nread = read(fd,buf,readlen);
    if (nread <= 0) {
        cancelReplicationHandshake();
    }
   
       // write to current temp-rdb.dump file
       nwritten = write(server.repl_transfer_fd,buf,nread)) 

  
        rename(server.repl_transfer_tmpfile,server.rdb_filename) ;
     
        /* We need to stop any AOFRW fork before flusing and parsing
         * RDB, otherwise we'll create a copy-on-write disaster. */
        if(aof_is_enabled) stopAppendOnly();
        
        //mark client as dirty,that will be remove
        signalFlushedDb(-1);
        
        // clean all  data for current node
        emptyDb(-1,server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,replicationEmptyDbCallback)
        {
            // proto long long emptyDb(int dbnum, int flags, void(callback)(void*)) 
            int async = (flags & EMPTYDB_ASYNC);
            long long removed = 0;
            int startdb = 0;
            int enddb = server.dbnum-1;

           for (int j = startdb; j <= enddb; j++) {
              removed += dictSize(server.db[j].dict);
              //async instand for as backgroup job to flush data in database
              if (async) {
                // that will create new dict after async,old dict mark to clean
                emptyDbAsync(&server.db[j]);
              } else {
                //blocking for flush all data in database
                dictEmpty(server.db[j].dict,callback);
                dictEmpty(server.db[j].expires,callback);
             }
        }
    
      
        /* Before loading the DB into memory we need to delete the readable
         * handler, otherwise it will get called recursively since
         * rdbLoad() will call the event loop to process events from time to
         * time for non blocking loading. */
        aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
        
        //init slave info
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        
        //load data from rdb_file 
        rdbLoad(server.rdb_filename,&rsi);
        
     
       // create master client for slave,that will got master socket and other info
       //   incude replid of master,server.replid
        replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
        server.repl_state = REPL_STATE_CONNECTED;
       
       // clean old replid,that included in server.replid2
        clearReplicationId2();
        
        /* Let's create the replication backlog if needed. Slaves need to
         * accumulate the backlog regardless of the fact they have sub-slaves
         * or not, in order to behave correctly if they are promoted to
         * masters after a failover. */
        if (server.repl_backlog == NULL) createReplicationBacklog();

     
        /* Restart the AOF subsystem now that we finished the sync. This
         * will trigger an AOF rewrite, and when done will start appending
         * to the new file. */
        if (aof_is_enabled) restartAOF();
    }
    return;

}

 * On success the fuction returns the number of keys removed from the
 * database(s). Otherwise -1 is returned in the specific case the
 * DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
    int async = (flags & EMPTYDB_ASYNC);
    long long removed = 0;

    if (dbnum < -1 || dbnum >= server.dbnum) {
        errno = EINVAL;
        return -1;
    }

    int startdb, enddb;
    if (dbnum == -1) {
        startdb = 0;
        enddb = server.dbnum-1;
    } else {
        startdb = enddb = dbnum;
    }

    for (int j = startdb; j <= enddb; j++) {
        removed += dictSize(server.db[j].dict);
        if (async) {
            emptyDbAsync(&server.db[j]);
        } else {
            dictEmpty(server.db[j].dict,callback);
            dictEmpty(server.db[j].expires,callback);
        }
    }
    if (server.cluster_enabled) {
        if (async) {
            slotToKeyFlushAsync();
        } else {
            slotToKeyFlush();
        }
    }
    if (dbnum == -1) flushSlaveKeysWithExpireList();
    return removed;
}

  • step4 wait for child process
//
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 
{
    int statloc
  if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == -1) {
                // error output
            } else if (pid == server.rdb_child_pid) {
                //each do bgsave ,server will store child pid 
                backgroundSaveDoneHandler(exitcode,bysignal)
                {
                     switch(server.rdb_child_type) {
                        case RDB_CHILD_TYPE_DISK:
                            backgroundSaveDoneHandlerDisk(exitcode,bysignal)
                            {
                                updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_DISK){
                                   if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                                      //
                                   } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
                                      if (type == RDB_CHILD_TYPE_SOCKET) {
                                      // update slave info
                                         slave->replstate = SLAVE_STATE_ONLINE;
                                          slave->repl_put_online_on_ack = 1;
                                          slave->repl_ack_time = server.unixtime; /* Timeout otherwise. *
                                      }else{
                                           listRewind(server.slaves,&li);
                                           while((ln = listNext(&li))) {
                                              client *slave = ln->value;
                                             //sendBulkToSlave:send maser rdb.dump to every slave
                                             aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave);
                                      }
                                  }
                            break;
                        case RDB_CHILD_TYPE_SOCKET:
                            backgroundSaveDoneHandlerSocket(exitcode,bysignal)
                            {
                               server.rdb_child_type = RDB_CHILD_TYPE_NONE;
                               updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
                            }
                            break;
                      }
                  }
                //read child data from parent,and check data
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                
                //read child data from parent,and check data
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
}

相关文章

网友评论

      本文标题:how work for redis replication

      本文链接:https://www.haomeiwen.com/subject/oldngftx.html