美文网首页
浅析 Redis 主从复制实现原理

浅析 Redis 主从复制实现原理

作者: JBryan | 来源:发表于2022-04-27 22:25 被阅读0次

    本篇主要分三部分讨论Redis主从复制的实现原理:主从复制过程、状态机、源码解析。Redis从节点使用了状态机机制,来实现从节点不同状态的切换,所以在解析源码之前,会先讨论下状态机的基本原理。

    1. 主从复制过程

    Redis 的 RDB 和 AOF 机制保证了服务的可靠性,而为了让服务实现高可用,Redis 使用了主从复制,而主从复制也是MySQL等数据库或其他存储系统实现高可用的方法。

    为了保证数据副本的一致性,主从库之间采用的是读写分离的方式:

    • 读操作:主库、从库都可以接收;
    • 写操作:首先到主库执行,然后,主库将写操作同步给从库。

    1.1. 主从数据同步

    当我们启动多个 Redis 实例的时候,它们相互之间就可以通过 replicaof(Redis 5.0 之前使用 slaveof)命令形成主库和从库的关系。
    例如,现在有实例 1(ip:172.16.19.3)和实例 2(ip:172.16.19.5),我们在实例 2 上执行以下这个命令后,实例 2 就变成了实例 1 的从库,并从实例 1 上复制数据:

    replicaof  172.16.19.3  6379
    

    主从实例会按照三个阶段完成数据同步:建立连接、全量复制、增量复制。下面对这三个阶段进行介绍:


    1.png
    1. 建立连接:第一阶段是主从库间建立连接、协商同步的过程,主要是为全量复制做准备。在这一步,从库和主库建立起连接,并告诉主库即将进行同步,主库确认回复后,主从库间就可以开始同步了。这一步主要包含从库给主库发送 psync 命令,以及主库响应 FULLRESYNC 命令。

      • 从库发送 psync 命令:携带主库runID 和 复制进度 offset 两个参数。因为从库第一次连接主库,并不知道主库的 runID,因此这时候 runID = ?;而 offset 值为 -1,表示这是第一次复制。

      • 主库响应 FULLRESYNC 命令:主库 runID 和 目前的复制进度 offset。FULLRESYNC 响应表示接下来将会进行全量复制。

    2. 全量复制:全量复制也会进行两个操作,主实例把所有的数据传输给从库,从库接收主库的所有数据,并在本地完成数据加载。具体来说就是:

      • 主库执行 bgsave 命令,生成 RDB 文件,接着将文件发给从库。主实例在执行 bgsave 命令时,会 fork 一个子进程来生成 RDB 文件,这块内容后面再单独讨论吧。

      • 从库接收到 RDB 文件后,会先清空当前数据库,然后加载 RDB 文件。这是因为从库在通过 replicaof 命令开始和主库同步前,可能保存了其他数据。为了避免之前数据的影响,从库需要先把当前数据库清空。

    3. 增量复制:第二阶段全量复制会是一个比较耗时的操作,而在进行全量复制时,主实例仍然在接收新的写命令,而这些命令是不会被写到 RDB 文件中的,具体为什么不会被写到 RDB 文件中,可以参考https://www.jianshu.com/p/f700dbd572a5 里面的 fork 和写时复制相关技术。因此就需要 replication buffer 这样一块缓冲区,来保存第二阶段执行期间,主实例接收的写操作。并在第二阶段执行结束之后,把 replication buffer 缓冲区中的数据发送给从节点。

    1.2. 网络故障之后的数据同步

    在 Redis 2.8 之前,如果主从库在命令传播时出现了网络闪断,那么,从库就会和主库重新进行一次全量复制,开销非常大。

    从 Redis 2.8 开始,网络断了之后,主从库会采用增量复制的方式继续同步。听名字大概就可以猜到它和全量复制的不同:全量复制是同步所有数据,而增量复制只会把主从库网络断连期间主库收到的命令,同步给从库。

    具体过程如下图所示:


    2.png
    1. 当主从库断连后,主库会把断连期间收到的写操作命令,写入 replication buffer,同时也会把这些操作命令也写入 repl_backlog_buffer 这个缓冲区。repl_backlog_buffer 是一个环形缓冲区,主库会记录自己写到的位置master_repl_offset,从库则会记录自己已经读到的位置 slave_repl_offset
    2. 主从库的连接恢复之后,从库首先会给主库发送 psync 命令,并把自己当前的 slave_repl_offset 发给主库,主库只用把 master_repl_offset 和 slave_repl_offset 之间的命令操作同步给从库就行。

    因为 repl_backlog_buffer 是一个环形缓冲区,所以在缓冲区写满后,主库会继续写入,此时,就会覆盖掉之前写入的操作。如果从库的读取速度比较慢,就有可能导致从库还未读取的操作被主库新写的操作覆盖了,这会导致主从库间的数据不一致

    2. 状态机

    在实际的软件开发中,状态模式并不是很常用,但是在能够用到的场景里,它可以发挥很大的作用。状态模式一般用来实现状态机,而状态机常用在游戏、工作流引擎等系统开发中。

    有限状态机,英文翻译是 Finite State Machine,缩写为 FSM,简称为状态机。状态机有 3 个组成部分:状态(State)、事件(Event)、动作(Action)。其中,事件也称为转移条件(Transition Condition)。事件触发状态的转移及动作的执行。不过,动作不是必须的,也可能只转移状态,不执行任何动作。

    拿《超级玛丽》举个例子哈,玛丽可以有多种状态,比如小玛丽,吃了蘑菇之后,就变成了大玛丽并且增加100积分;而如果碰到了野怪,小玛丽就直接over了,大玛丽就会变成小玛丽并且减少100积分。这个例子里面呢,小玛丽或者大玛丽都是状态机的状态,加减积分就是动作,吃蘑菇或者撞野怪就是事件

    3.png
    简化后的部分状态和事件如下图所示:
    4.png

    2.1. 分支实现

    如果将上面描述的简易版超级玛丽用代码实现,简单直接的实现方式是,参照状态转移图,将每一个状态转移,原模原样地直译成代码。这样编写的代码会包含大量的 if-else 或 switch-case 分支判断逻辑,甚至是嵌套的分支判断逻辑。

    public enum State {
      SMALL(0),
      SUPER(1),
      OVER(2);
    
      private int value;
    
      private State(int value) {
        this.value = value;
      }
    
      public int getValue() {
        return this.value;
      }
    }
    
    public class MarioStateMachine {
      private int score;
      private State currentState;
    
      public MarioStateMachine() {
        this.score = 0;
        this.currentState = State.SMALL;
      }
    
      public void obtainMushRoom() {
        if (currentState.equals(State.SMALL)) { 
            this.currentState = State.SUPER; 
            this.score += 100; 
        } else {
            this.score += 100; 
        }
      }
    
      public void meetMonster() {
        if (currentState.equals(State.SUPER)) { 
            this.currentState = State.SMALL; 
            this.score -= 100; 
        } else {
            this.currentState = State.OVEW; 
            this.score = 0;
        }
      }
    
      public int getScore() {
        return this.score;
      }
    
      public State getCurrentState() {
        return this.currentState;
      }
    }
    
    public class ApplicationDemo {
      public static void main(String[] args) {
        MarioStateMachine mario = new MarioStateMachine();
        mario.obtainMushRoom();
        int score = mario.getScore();
        State state = mario.getCurrentState();
        System.out.println("mario score: " + score + "; state: " + state);
      }
    }
    

    对于简单的状态机来说,分支逻辑这种实现方式是可以接受的。但是,对于复杂的状态机来说,这种实现方式极易漏写或者错写某个状态转移。除此之外,代码中充斥着大量的 if-else 或者 switch-case 分支判断逻辑,可读性和可维护性都很差。如果哪天修改了状态机中的某个状态转移,我们要在冗长的分支逻辑中找到对应的代码进行修改,很容易改错,引入 bug。

    2.2. 状态模式

    状态模式通过将事件触发的状态转移和动作执行,拆分到不同的状态类中,来避免分支判断逻辑。我们还是结合代码来理解这句话。

    其中,IMario 是状态的接口,定义了所有的事件。SmallMario、SuperMario是 IMario 接口的实现类,分别对应状态机中的不同的状态。原来在状态机MarioStateMachine中定义的事件处理逻辑,现在分散到了 IMario 的实现类里面。

    public interface IMario {
      State getName();
      void obtainMushRoom(MarioStateMachine stateMachine);
      void meetMonster(MarioStateMachine stateMachine);
    }
    
    public class SmallMario implements IMario {
      private static final SmallMario instance = new SmallMario();
      private SmallMario() {}
      public static SmallMario getInstance() {
        return instance;
      }
    
      @Override
      public State getName() {
        return State.SMALL;
      }
    
      @Override
      public void obtainMushRoom(MarioStateMachine stateMachine) {
        stateMachine.setCurrentState(SuperMario.getInstance());
        stateMachine.setScore(stateMachine.getScore() + 100);
      }
    
      @Override
      public void meetMonster(MarioStateMachine stateMachine) {
        stateMachine.setCurrentState(OverMario.getInstance());
      }
    }
    
    // 省略SuperMario类、OverMario类...
    public class MarioStateMachine {
      private int score;
      private IMario currentState;
    
      public MarioStateMachine() {
        this.score = 0;
        this.currentState = SmallMario.getInstance();
      }
    
      public void obtainMushRoom() {
        this.currentState.obtainMushRoom(this);
      }
    
      public void meetMonster() {
        this.currentState.meetMonster(this);
      }
    
      public int getScore() {
        return this.score;
      }
    
      public State getCurrentState() {
        return this.currentState.getName();
      }
    
      public void setScore(int score) {
        this.score = score;
      }
    
      public void setCurrentState(IMario currentState) {
        this.currentState = currentState;
      }
    }
    

    其实状态机还有一种实现方式为查表法,但是个人感觉这种查表法的应用场景非常有限,这里就不详细介绍了,有兴趣可以看下极客时间里面王争老师的专栏《设计模式之美》。

    3. 源码解析

    Redis 5.0 源码地址:https://github.com/redis/redis/tree/5.0

    Redis 主从复制过程中,从节点会处于初始化、建立连接、握手验证、增量复制、全量复制等多个不同的状态。Redis 就是使用了基于状态机的设计思想,来清晰的实现不同状态间的跳转。因为主从复制过程中的状态比较多,很难把每一个状态都说清楚,这里只讨论下关键的几个状态及状态间的跳转。

    3.1. 数据结构及初始化

    每一个 Redis 实例在代码中都对应一个 redisServer 结构体,这个结构体包含了和 Redis 实例相关的各种配置,比如实例的 RDB、AOF 配置、主从复制配置、切片集群配置等。然后,与主从复制状态机相关的变量是 repl_state,Redis 在进行主从复制时,从库就是根据这个变量值的变化,来实现不同阶段的执行和跳转。

    struct redisServer {
       ...
       /* 复制相关(slave) */
        char *masterauth;               /* 用于和主库进行验证的密码*/
        char *masterhost;               /* 主库主机名 */
        int masterport;                 /* 主库端口号r */
        …
        client *master;        /* 从库上用来和主库连接的客户端 */
        client *cached_master; /* 从库上缓存的主库信息 */
        int repl_state;          /* 从库的复制状态机 */
       ...
    }
    

    个人理解这里的 repl_state 就相当于 2.1 中的 State 枚举类,定义了从库的不同状态。这里有一点需要说明哈,就是主从复制的状态机都是在从节点上才有,主节点是没有状态机的,到后面会讨论主节点为什么没有状态机这个问题。

    接下来说下初始化,首先,当一个实例启动后,就会调用 server.c 中的 initServerConfig 函数,初始化 redisServer 结构体。此时,实例会把状态机的初始状态设置为 REPL_STATE_NONE,如下所示:

    void initServerConfig(void) {
       …
       server.repl_state = REPL_STATE_NONE;
       …
    }
    

    然后,一旦实例执行了 replicaof 172.16.19.3 6379 命令,就会调用 replication.c 中的 replicaofCommand 函数进行处理。replicaofCommand 函数会调用 replicationSetMaster 函数设置主库的信息。这部分的代码逻辑如下所示:

    void replicaofCommand(client *c) {
        /* The special host/port combination "NO" "ONE" turns the instance
         * into a master. Otherwise the new master address is set. */
        if (!strcasecmp(c->argv[1]->ptr,"no") &&
            !strcasecmp(c->argv[2]->ptr,"one")) {
            ......
        } else {
             /* 如果没有记录主库的IP和端口号,设置主库的信息 */
            replicationSetMaster(c->argv[1]->ptr, port);
            ......
        }
        addReply(c,shared.ok);
    }
    
    /* Set replication to the specified master address and port. */
    void replicationSetMaster(char *ip, int port) {
        ......
        server.masterhost = sdsnew(ip);
        server.masterport = port;
        ......
        server.repl_state = REPL_STATE_CONNECT;
    }
    

    这里就是设置刚才数据结构里面提到的 host 和 port 两个变量,并把状态机的状态设置为 REPL_STATE_CONNECT

    3.2. 状态的跳转

    Redis 的周期性任务,就是指 Redis 实例在运行时,按照一定时间周期重复执行的任务。Redis 的周期性任务很多,其中之一就是 replicationCron() 任务。这个任务的执行频率是每 1000ms 执行一次,如下面的代码所示:

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
       …
       run_with_period(1000) replicationCron();
       …
    }
    

    这个 serverCron 函数在 server.c 函数中,他会在 Redis 实例启动的 main 函数执行时候,注册一个时间事件,该时间事件会立即被触发,触发后的回调函数就是这个 serverCron 函数。这一块的详细内容,后面疫情过去到公司了,再把事件驱动框架的分析贴出来,这里只需要知道这个函数会在 Redis 启动之后执行,并按照一定周期来执行相应的任务就行
    接下来再来看下 replicationCron 函数,他是在 replication.c 文件中,在这个函数里面,判断从节点状态机的状态为 REPL_STATE_CONNECT 时,会和主节点建立连接,如下所示:

    /* Replication cron function, called 1 time per second. */
    void replicationCron(void) {
        ......
        /* Check if we should connect to a MASTER */
        /* 如果从库实例的状态是REPL_STATE_CONNECT,那么从库通过connectWithMaster和主库建立连接 */
        //3.1 小节中有分析过,执行了replicaof 之后,会把从库的状态机设置为 REPL_STATE_CONNECT,因此就会首先执行这个分支
        if (server.repl_state == REPL_STATE_CONNECT) {
            serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
                server.masterhost, server.masterport);
            if (connectWithMaster() == C_OK) {
                serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
            }
        }
        ......
    }
    

    connectWithMaster 函数中,首先和主节点建立连接,返回一个文件描述符 fd,当连接 fd 上有事件发生时,会触发 syncWithMaster 回调函数,方法返回前,会给状态机的状态设置为 REPL_STATE_CONNECTING

    int connectWithMaster(void) {
        int fd;
        //从库和主库建立连接
        fd = anetTcpNonBlockBestEffortBindConnect(NULL,
            server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
        if (fd == -1) {
            serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
                strerror(errno));
            return C_ERR;
        }
        //在建立的连接上注册读写事件,对应的回调函数是syncWithMaster
        if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
                AE_ERR)
        {
            close(fd);
            serverLog(LL_WARNING,"Can't create readable event for SYNC");
            return C_ERR;
        }
    
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_s = fd;
        //完成连接后,将状态机设置为REPL_STATE_CONNECTING
        server.repl_state = REPL_STATE_CONNECTING;
        return C_OK;
    }
    

    syncWithMaster 函数前面会经过一系列的握手操作,然后会调用 slaveTryPartialResynchronization 函数发送 1.1 小节中提到的 psync 命令,并根据 slaveTryPartialResynchronization 函数的返回值,来执行全量复制,或者让 slaveTryPartialResynchronization 函数执行增量复制

    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
        //前面有一系列握手操作,这里就不详细介绍了
        ......
        if (server.repl_state == REPL_STATE_SEND_PSYNC) {
            //向主库发送PSYNC命令,进行数据同步
            if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
                err = sdsnew("Write error sending the PSYNC command.");
                goto write_error;
            }
            server.repl_state = REPL_STATE_RECEIVE_PSYNC;
            return;
        }
        
        //读取PSYNC命令的返回结果
        psync_result = slaveTryPartialResynchronization(fd,1);
        //PSYNC结果还没有返回,先从syncWithMaster函数返回处理其他操作
        if (psync_result == PSYNC_WAIT_REPLY) return;
        //如果PSYNC结果是PSYNC_CONTINUE,从syncWithMaster函数返回
        if (psync_result == PSYNC_CONTINUE) {
               …
               return;
        }
    
        //如果执行全量复制的话,针对连接上的读事件,创建readSyncBulkPayload回调函数
        if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
                    == AE_ERR)
            {
               …
            }
        //将从库状态机置为REPL_STATE_TRANSFER
        server.repl_state = REPL_STATE_TRANSFER;
    
    
    }
    

    下面简单介绍下 slaveTryPartialResynchronization 函数,如果从库是第一次和主库连接,则发送 psync 命令,然后读取主库的响应,并根据主库的响应结果,来执行增量复制:

    int slaveTryPartialResynchronization(int fd, int read_reply) {
        ......
    
        /* Writing half */
        //发送PSYNC命令,
        if (!read_reply) {
            ......
             //从库第一次和主库同步时,设置offset为-1
            server.master_initial_offset = -1;
    
            ......
            /* Issue the PSYNC command */
            //调用sendSynchronousCommand发送PSYNC命令
            reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
            ......
            return PSYNC_WAIT_REPLY;
        }
    
        /* Reading half */
        //读取主库响应
        reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        if (sdslen(reply) == 0) {
            /* The master may send empty newlines after it receives PSYNC
             * and before to reply, just to keep the connection alive. */
            sdsfree(reply);
            return PSYNC_WAIT_REPLY;
        }
    
        aeDeleteFileEvent(server.el,fd,AE_READABLE);
    
        //主库返回FULLRESYNC
        if (!strncmp(reply,"+FULLRESYNC",11)) {
            ......
            return PSYNC_FULLRESYNC;
        }
        //主库返回CONTINUE,执行增量复制
        if (!strncmp(reply,"+CONTINUE",9)) {
            ......
            return PSYNC_CONTINUE;
        }
    
        ......
        return PSYNC_NOT_SUPPORTED;
    }
    

    3.3. 主库的操作

    在 Redis 实现主从复制时,从库涉及到的状态变迁有很多,包括了发起连接、主从握手、复制类型判断、请求数据等。因此,使用状态机开发从库的复制流程,可以很好地帮助我们实现状态流转。

    主从复制的发起方是从库,而对于主库来说,它只是被动式地响应从库的各种请求,并根据从库的请求执行相应的操作,比如生成 RDB 文件或是传输数据等。

    而且,从另外一个角度来说,主库可能和多个从库进行主从复制,而不同从库的复制进度和状态很可能并不一样,如果主库要维护状态机的话,那么,它还需要为每个从库维护一个状态机,这个既会增加开发复杂度,也会增加运行时的开销。正是因为这些原因,所以主库并不需要使用状态机进行状态流转。

    主库本身是可能发生故障,并要进行故障切换的。如果主库在执行主从复制时,也维护状态机,那么一旦主库发生了故障,也还需要考虑状态机的冗余备份和故障切换,这会给故障切换的开发和执行带来复杂度和开销。而从库维护状态机本身就已经能完成主从复制,所以没有必要让主库再维护状态机了。

    参考资料:

    1. 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
    2. 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
    3. 极客时间专栏《设计模式之美》.王争.2020
    4. Redis 5.0.14 源码:https://github.com/redis/redis/tree/5.0

    `

    相关文章

      网友评论

          本文标题:浅析 Redis 主从复制实现原理

          本文链接:https://www.haomeiwen.com/subject/tfxryrtx.html