美文网首页
一条Redis命令的执行过程

一条Redis命令的执行过程

作者: JBryan | 来源:发表于2022-04-07 21:40 被阅读0次

    本文分析的Redis源码是基于Redis 6.0的,Redis 6.0之前的代码会稍微有些出入,但是整体流程大致类似。主要是为了后续分析Redis 6.0推出的多I/O线程实现做基础,而分析Redis命令的执行过程,需要先了解下Redis的事件驱动框架的实现,这篇文档在公司电脑上,后面疫情解封之后,到公司再贴出来吧。

    1. 网络通信模型

    Redis使用多路复用机制来实现对网络请求的处理,他会根据Redis运行的操作系统,选择相应的多路复用机制,比如在Linux上就会选择使用epoll、在windows上就会选择使用select。Redis的事件驱动框架对多路复用机制做了一层封装,这里先抛开事件驱动框架,分析Redis网络通信模型,下面以epoll为例。

    首先介绍下epoll机制中,几个比较重要的函数:

    1. epoll_create:epoll_create函数会创建一个epoll实例,而epoll实例中记录了监听的文件描述符以及已经就绪的文件描述符。
    2. epoll_ctl:向epoll实例中注册一个需要被监听的socket(包括监听socket和已连接socket),监听socket用于处理客户端的连接事件进而建立已连接socket,而已连接socket用于处理客户端的读、写事件。
    3. epoll_wait:等待epoll实例监听的socket上的I/O事件发生。在调用epoll_wait函数时,可以传入一个timeout参数,如果传入-1,epoll_wait无限期阻塞,直到有I/O事件发生,否则在指定超时时间返回。Redis在调用epoll_wait时,会传入-1,就是说如果没有I/O事件发生时,Redis主线程会一直阻塞下去。

    这里需要简单说一下就是,如果只监听了一个端口的话,那么我们的应用程序中只会有一个监听socket,每当有客户端连接进来的时候,就会创建一个已连接socket,所以我们的应用程序会存在多个已连接socket。

    如果使用epoll来实现网络编程,通常会实现类似下面这样的模型:

    int sock_fd,conn_fd; //监听套接字和已连接套接字的变量
    sock_fd = socket() //创建套接字
    bind(sock_fd)   //绑定套接字
    listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字
        
    epfd = epoll_create(EPOLL_SIZE); //创建epoll实例,
    //创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型    
    ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);
    
    //创建epoll_event变量
    struct epoll_event ee
    //监听读事件
    ee.events = EPOLLIN;
    //监听的文件描述符是刚创建的监听套接字
    ee.data.fd = sock_fd;
    
    //将监听套接字加入到监听列表中    
    epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); 
        
    while (1) {
       //等待返回已经就绪的描述符 
       n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
       //遍历所有就绪的描述符     
       for (int i = 0; i < n; i++) {
           //如果是监听套接字描述符就绪,表明有一个新客户端连接到来 
           if (ep_events[i].data.fd == sock_fd) { 
              conn_fd = accept(sock_fd); //调用accept()建立连接
              ee.events = EPOLLIN;  
              ee.data.fd = conn_fd;
              //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件      
              epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); 
                    
           } else { //如果是已连接套接字描述符就绪,则可以读数据
               ...//读取数据并处理
           }
       }
    }
    

    首先创建监听socket并调用epoll_create函数创建一个epoll实例,然后调用epoll_ctl把监听socket注册到epoll实例上,在一个死循环里面调用epoll_wait等待epoll上的socket有事件发生。

    epoll_wait返回后,根据有事件发生的socket是监听socket还是已连接socket,来执行相应逻辑:

    • 如果是监听socket,说明有新的客户端来建立连接,创建已连接socket,并把已连接socket注册到epoll实例上。
    • 如果是已连接socket,说明可以读取客户端的请求,则执行相应的解析、处理请求并写回客户端操作。

    这个过程如下图所示:


    1.png

    2. 一条命令的处理过程

    源码地址:https://github.com/redis/redis/tree/6.0/src

    Redis的启动main函数是在server.c文件中,在初始化的时候,会通过事件驱动框架相关函数调用epoll_create函数来创建epoll实例。然后通过事件驱动框架的aeCreateFileEvent函数,调用epoll_ctl函数向epoll实例注册监听socket,并传入一个回调函数acceptTcpHandler,当监听socket有客户端建立连接时,就会触发acceptTcpHandler函数建立连接。

        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
    

    而在acceptTcpHandler函数中,会创建已连接socket并注册回调函数readQueryFromClient,当已连接socket有事件发生之后,就会触发readQueryFromClient函数。其实这中间是有很多其他函数调用的,我这里就不展开说了。

        if (conn) {
            connNonBlock(conn);
            connEnableTcpNoDelay(conn);
            if (server.tcpkeepalive)
                connKeepAlive(conn,server.tcpkeepalive);
            connSetReadHandler(conn, readQueryFromClient);
            connSetPrivateData(conn, c);
        }
    

    命令的处理过程也就是从readQueryFromClient函数开始,分为四个阶段:

    • 命令读取,对应 readQueryFromClient 函数;
    • 命令解析,对应 processInputBufferAndReplicate 函数;
    • 命令执行,对应 processCommand 函数;
    • 结果返回,对应 addReply 函数;

    2.1. 命令读取

    readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。

    紧接着,readQueryFromClient 函数会根据读取数据的情况,进行一些异常处理,比如数据读取失败或是客户端连接关闭等。此外,如果当前客户端是主从复制中的主节点,readQueryFromClient 函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。

    最后,readQueryFromClient 函数会调用 processInputBuffer 函数,这就进入到了命令处理的下一个阶段,也就是命令解析阶段。

    void readQueryFromClient(connection *conn) {
        ......
    
        //从客户端socket中读取的数据长度,默认为16KB
        readlen = PROTO_IOBUF_LEN;
        ......
        
        //给缓冲区分配空间
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        //调用read从描述符为fd的客户端socket中读取数据
        nread = connRead(c->conn, c->querybuf+qblen, readlen);
        
        //调用processInputBuffer进一步处理读取内容
        /* There is more data in the client input buffer, continue parsing it
         * in case to check if there is a full command to execute. */
         processInputBuffer(c);
    }
    

    2.2. 命令解析

    processInputBuffer(在 networking.c 文件中)函数,对客户端输入缓冲区中的命令和参数进行解析。下面我们接单分析下这个函数。

    首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“”开头*。

    • 如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。
    • 如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于 PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用 processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。
      这样,等命令解析完成后,processInputBuffer 函数就会调用 processCommandAndResetClient函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。下面的代码展示了 processInputBuffer 函数解析命令时的主要流程,你可以看下。
    void processInputBuffer(client *c) {
       while(c->qb_pos < sdslen(c->querybuf)) {
          ...
          /* Determine request type when unknown. */
           if (!c->reqtype) {
                //根据客户端输入缓冲区的命令开头字符判断命令类型
                if (c->querybuf[c->qb_pos] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK; //符合RESP协议的命令
                } else {
                    c->reqtype = PROTO_REQ_INLINE; //管道类型命令
                }
            }
            if (c->reqtype == PROTO_REQ_INLINE) {
                if (processInlineBuffer(c) != C_OK) break;  //对于管道类型命令,调用processInlineBuffer函数解析
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                if (processMultibulkBuffer(c) != C_OK) break; //对于RESP协议命令,调用processMultibulkBuffer函数解析
            }
            ... 
           if (c->argc == 0) {
                resetClient(c);
            } else {
                //这里还有一些多I/O线程的相关操作
                ......
                //调用processCommandAndResetClient函数,开始执行命令
                if (processCommandAndResetClient(c) == C_ERR) {
                   ...   } 
                ... }
            }
            ...
    }
    

    2.3. 命令执行

    processCommandAndResetClient函数会调用processCommand(在server.c文件中)函数来执行命令

    int processCommandAndResetClient(client *c) {
        ......
        if (processCommand(c) == C_OK) {
            commandProcessed(c);
        }
        ......
        return deadclient ? C_ERR : C_OK;
    }
    

    而processCommand函数在实际执行命令前的主要逻辑可以分成三步:

    • 第一步,processCommand 函数会调用 moduleCallCommandFilters 函数(在module.c文件),将 Redis 命令替换成 module 中想要替换的命令。
    • 第二步,processCommand 函数会判断当前命令是否为 quit 命令,并进行相应处理。
    • 第三步,processCommand 函数会调用 lookupCommand 函数,在全局变量 server 的 commands 成员变量中查找相关的命令。
      这里,全局变量 server 的 commands 成员变量是一个哈希表,它的定义是在server.h文件中的 redisServer 结构体里面,如下所示:
    struct redisServer {
       ...
       dict *commands; 
       ...
    }
    

    另外,commands 成员变量的初始化是在Redis初始化时,完成哈希表创建,再Redis 提供的命令名称和对应的实现函数,插入到哈希表中。

    插入哈希表时,会用把redisCommandTable数组中的内容,填充到哈希表中,redisCommandTable数组在server.c中,比如,以下代码展示了 GET 和 SET 这两条命令的信息,它们各自的实现函数分别是 getCommand 和 setCommand。当然,如果你想进一步了解 redisCommand 结构体,也可以去看下它的定义:

    struct redisCommand redisCommandTable[] = {
        ...
        {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
        {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
        ...
    }
    

    lookupCommand 函数会根据解析的命令名称,在 commands 对应的哈希表中查找相应的命令。一旦查到对应命令后,processCommand 函数对命令做完各种检查后,它就开始执行命令了。它会判断当前客户端是否有 CLIENT_MULTI 标记,如果有的话,就表明要处理的是 Redis 事务的相关命令,所以它会按照事务的要求,调用 queueMultiCommand 函数将命令入队保存,等待后续一起处理。而如果没有,processCommand 函数就会调用 call 函数来实际执行命令了。以下代码展示了这部分的逻辑,你可以看下。

        //如果客户端有CLIENT_MULTI标记,并且当前不是exec、discard、multi和watch命令
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            queueMultiCommand(c);  //将命令入队保存,等待后续一起处理
            addReply(c,shared.queued);
        } else {
            call(c,CMD_CALL_FULL);  //调用call函数执行命令
            ...
        }
    

    call 函数是在 server.c 文件中实现的,它执行命令是通过调用命令本身,即 redisCommand 结构体中定义的函数指针来完成的。这里,以 SET 命令为例来介绍下它的实际执行过程。

    SET 命令对应的实现函数是 setCommand,这是在t_string.c文件中定义的。setCommand 函数首先会对命令参数进行判断,比如参数是否带有 NX、EX、XX、PX 等这类命令选项,如果有的话,setCommand 函数就会记录下这些标记。

    然后,setCommand 函数会调用 setGenericCommand 函数,这个函数也是在 t_string.c 文件中实现的。setGenericCommand 函数会根据刚才 setCommand 函数记录的命令参数的标记,来进行相应处理。

    如果 SET 命令可以正常执行的话,也就是说命令带有 NX 选项但是 key 并不存在,或者带有 XX 选项但是 key 已经存在,这样 setGenericCommand 函数就会调用 setKey 函数(在 db.c 文件中)来完成键值对的实际插入,如下所示:

    setKey(c->db,key,val);
    

    然后,如果命令设置了过期时间,setGenericCommand 函数还会调用 setExpire 函数设置过期时间。最后,setGenericCommand 函数会调用 addReply 函数,将结果返回给客户端,如下所示:

    addReply(c, ok_reply ? ok_reply : shared.ok);
    

    无论是在命令执行的过程中,发现不符合命令的执行条件,或是命令能成功执行,addReply 函数都会被调用,用来返回结果。所以,这就进入到命令处理过程的最后一个阶段:结果返回阶段。

    2.4. 结果返回

    addReply 函数是在 networking.c 文件中定义的。它的执行逻辑比较简单,主要是调用 prepareClientToWrite 函数,并在 prepareClientToWrite 函数中调用 clientInstallWriteHandler 函数,将待写回客户端加入到全局变量 server 的 clients_pending_write 列表中。然后,addReply 函数会调用 _addReplyToBuffer 等函数(在 networking.c 中),将要返回的结果添加到客户端的输出缓冲区中。

    /* Add the object 'obj' string representation to the client output buffer. */
    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            /* For integer encoded strings we just convert it into a string
             * using our optimized function, and attach the resulting string
             * to the output buffer. */
            char buf[32];
            size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) != C_OK)
                _addReplyProtoToList(c,buf,len);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    

    3. 总结

    Redis通过多路复用机制来实现网络通信,同时也基于多路复用机制封装了事件驱动框架,这篇文章抛开了事件驱动框架的部分(后续会单独整理),主要是为了能够更加清晰的表述一条命令的处理过程,2.4中只是把命令的执行结果写到输出缓冲区中,并没有真正写回给客户端,而真正写回客户端的操作是在beforeSleep函数中实现的,后续分析Redis 6.0新推出的多I/O线程实现原理时候,再详细介绍这一部分吧。

    然后我这里呢,结合前面两部分,用伪代码再描述下整个流程,有两个函数之前没有提过,beforeSleep和aftersleep,先暂时忽略好了:

    //创建epoll实例
    epfd = epoll_create(EPOLL_SIZE); 
    
    //将监听socket加入到监听列表中
    epoll_ctl(epfd, EPOLL_CTL_ADD, socket_listen, &ee); 
    
    //循环监听发生的I/O事件
    while (1) {
       //这个函数的作用后面分析多I/O线程的时候再介绍吧
       beforeSleep();
       //等待返回已经就绪的描述符 
       n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
       //这个函数的作用后面分析多I/O线程的时候再介绍吧
       aftersleep();
       
       //遍历所有就绪的描述符     
       for (int i = 0; i < n; i++) {
      
           //如果是监听socket描述符就绪,表明有一个新客户端连接到来 
           if (ep_events[i].data.fd == socket_listen) { 
              //调用accept()建立连接,实际上是通过acceptTcpHandler函数间接调用的
              socket_connection = accept(sock_fd);
              
              //注册已连接socket
              epoll_ctl(epfd, EPOLL_CTL_ADD, socket_connection, &ee); 
           } else { 
              //如果是已连接socket,则开始一条命令的处理过程
              //1. 读取命令
              readQueryFromClient(socket_connection);
              //2. 解析命令
              processInputBuffer();
              //3. 执行命令
              processCommand();
              //4. 返回结果
              addReply();
           }
       }
       
    }
    

    参考资料:

    1. 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
    2. 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
    3. Redis 6.0源码:https://github.com/redis/redis/tree/6.0/src

    相关文章

      网友评论

          本文标题:一条Redis命令的执行过程

          本文链接:https://www.haomeiwen.com/subject/ddwssrtx.html