美文网首页
一条Redis命令的执行过程

一条Redis命令的执行过程

作者: JBryan | 来源:发表于2022-04-07 21:40 被阅读0次

本文分析的Redis源码是基于Redis 6.0的,Redis 6.0之前的代码会稍微有些出入,但是整体流程大致类似。主要是为了后续分析Redis 6.0推出的多I/O线程实现做基础,而分析Redis命令的执行过程,需要先了解下Redis的事件驱动框架的实现,这篇文档在公司电脑上,后面疫情解封之后,到公司再贴出来吧。

1. 网络通信模型

Redis使用多路复用机制来实现对网络请求的处理,他会根据Redis运行的操作系统,选择相应的多路复用机制,比如在Linux上就会选择使用epoll、在windows上就会选择使用select。Redis的事件驱动框架对多路复用机制做了一层封装,这里先抛开事件驱动框架,分析Redis网络通信模型,下面以epoll为例。

首先介绍下epoll机制中,几个比较重要的函数:

  1. epoll_create:epoll_create函数会创建一个epoll实例,而epoll实例中记录了监听的文件描述符以及已经就绪的文件描述符。
  2. epoll_ctl:向epoll实例中注册一个需要被监听的socket(包括监听socket和已连接socket),监听socket用于处理客户端的连接事件进而建立已连接socket,而已连接socket用于处理客户端的读、写事件。
  3. epoll_wait:等待epoll实例监听的socket上的I/O事件发生。在调用epoll_wait函数时,可以传入一个timeout参数,如果传入-1,epoll_wait无限期阻塞,直到有I/O事件发生,否则在指定超时时间返回。Redis在调用epoll_wait时,会传入-1,就是说如果没有I/O事件发生时,Redis主线程会一直阻塞下去。

这里需要简单说一下就是,如果只监听了一个端口的话,那么我们的应用程序中只会有一个监听socket,每当有客户端连接进来的时候,就会创建一个已连接socket,所以我们的应用程序会存在多个已连接socket。

如果使用epoll来实现网络编程,通常会实现类似下面这样的模型:

int sock_fd,conn_fd; //监听套接字和已连接套接字的变量
sock_fd = socket() //创建套接字
bind(sock_fd)   //绑定套接字
listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字
    
epfd = epoll_create(EPOLL_SIZE); //创建epoll实例,
//创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型    
ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);

//创建epoll_event变量
struct epoll_event ee
//监听读事件
ee.events = EPOLLIN;
//监听的文件描述符是刚创建的监听套接字
ee.data.fd = sock_fd;

//将监听套接字加入到监听列表中    
epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee); 
    
while (1) {
   //等待返回已经就绪的描述符 
   n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
   //遍历所有就绪的描述符     
   for (int i = 0; i < n; i++) {
       //如果是监听套接字描述符就绪,表明有一个新客户端连接到来 
       if (ep_events[i].data.fd == sock_fd) { 
          conn_fd = accept(sock_fd); //调用accept()建立连接
          ee.events = EPOLLIN;  
          ee.data.fd = conn_fd;
          //添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件      
          epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee); 
                
       } else { //如果是已连接套接字描述符就绪,则可以读数据
           ...//读取数据并处理
       }
   }
}

首先创建监听socket并调用epoll_create函数创建一个epoll实例,然后调用epoll_ctl把监听socket注册到epoll实例上,在一个死循环里面调用epoll_wait等待epoll上的socket有事件发生。

epoll_wait返回后,根据有事件发生的socket是监听socket还是已连接socket,来执行相应逻辑:

  • 如果是监听socket,说明有新的客户端来建立连接,创建已连接socket,并把已连接socket注册到epoll实例上。
  • 如果是已连接socket,说明可以读取客户端的请求,则执行相应的解析、处理请求并写回客户端操作。

这个过程如下图所示:


1.png

2. 一条命令的处理过程

源码地址:https://github.com/redis/redis/tree/6.0/src

Redis的启动main函数是在server.c文件中,在初始化的时候,会通过事件驱动框架相关函数调用epoll_create函数来创建epoll实例。然后通过事件驱动框架的aeCreateFileEvent函数,调用epoll_ctl函数向epoll实例注册监听socket,并传入一个回调函数acceptTcpHandler,当监听socket有客户端建立连接时,就会触发acceptTcpHandler函数建立连接。

    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }

而在acceptTcpHandler函数中,会创建已连接socket并注册回调函数readQueryFromClient,当已连接socket有事件发生之后,就会触发readQueryFromClient函数。其实这中间是有很多其他函数调用的,我这里就不展开说了。

    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }

命令的处理过程也就是从readQueryFromClient函数开始,分为四个阶段:

  • 命令读取,对应 readQueryFromClient 函数;
  • 命令解析,对应 processInputBufferAndReplicate 函数;
  • 命令执行,对应 processCommand 函数;
  • 结果返回,对应 addReply 函数;

2.1. 命令读取

readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。

紧接着,readQueryFromClient 函数会根据读取数据的情况,进行一些异常处理,比如数据读取失败或是客户端连接关闭等。此外,如果当前客户端是主从复制中的主节点,readQueryFromClient 函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。

最后,readQueryFromClient 函数会调用 processInputBuffer 函数,这就进入到了命令处理的下一个阶段,也就是命令解析阶段。

void readQueryFromClient(connection *conn) {
    ......

    //从客户端socket中读取的数据长度,默认为16KB
    readlen = PROTO_IOBUF_LEN;
    ......
    
    //给缓冲区分配空间
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //调用read从描述符为fd的客户端socket中读取数据
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    
    //调用processInputBuffer进一步处理读取内容
    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */
     processInputBuffer(c);
}

2.2. 命令解析

processInputBuffer(在 networking.c 文件中)函数,对客户端输入缓冲区中的命令和参数进行解析。下面我们接单分析下这个函数。

首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“”开头*。

  • 如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。
  • 如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于 PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用 processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。
    这样,等命令解析完成后,processInputBuffer 函数就会调用 processCommandAndResetClient函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。下面的代码展示了 processInputBuffer 函数解析命令时的主要流程,你可以看下。
void processInputBuffer(client *c) {
   while(c->qb_pos < sdslen(c->querybuf)) {
      ...
      /* Determine request type when unknown. */
       if (!c->reqtype) {
            //根据客户端输入缓冲区的命令开头字符判断命令类型
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK; //符合RESP协议的命令
            } else {
                c->reqtype = PROTO_REQ_INLINE; //管道类型命令
            }
        }
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;  //对于管道类型命令,调用processInlineBuffer函数解析
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break; //对于RESP协议命令,调用processMultibulkBuffer函数解析
        }
        ... 
       if (c->argc == 0) {
            resetClient(c);
        } else {
            //这里还有一些多I/O线程的相关操作
            ......
            //调用processCommandAndResetClient函数,开始执行命令
            if (processCommandAndResetClient(c) == C_ERR) {
               ...   } 
            ... }
        }
        ...
}

2.3. 命令执行

processCommandAndResetClient函数会调用processCommand(在server.c文件中)函数来执行命令

int processCommandAndResetClient(client *c) {
    ......
    if (processCommand(c) == C_OK) {
        commandProcessed(c);
    }
    ......
    return deadclient ? C_ERR : C_OK;
}

而processCommand函数在实际执行命令前的主要逻辑可以分成三步:

  • 第一步,processCommand 函数会调用 moduleCallCommandFilters 函数(在module.c文件),将 Redis 命令替换成 module 中想要替换的命令。
  • 第二步,processCommand 函数会判断当前命令是否为 quit 命令,并进行相应处理。
  • 第三步,processCommand 函数会调用 lookupCommand 函数,在全局变量 server 的 commands 成员变量中查找相关的命令。
    这里,全局变量 server 的 commands 成员变量是一个哈希表,它的定义是在server.h文件中的 redisServer 结构体里面,如下所示:
struct redisServer {
   ...
   dict *commands; 
   ...
}

另外,commands 成员变量的初始化是在Redis初始化时,完成哈希表创建,再Redis 提供的命令名称和对应的实现函数,插入到哈希表中。

插入哈希表时,会用把redisCommandTable数组中的内容,填充到哈希表中,redisCommandTable数组在server.c中,比如,以下代码展示了 GET 和 SET 这两条命令的信息,它们各自的实现函数分别是 getCommand 和 setCommand。当然,如果你想进一步了解 redisCommand 结构体,也可以去看下它的定义:

struct redisCommand redisCommandTable[] = {
    ...
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    ...
}

lookupCommand 函数会根据解析的命令名称,在 commands 对应的哈希表中查找相应的命令。一旦查到对应命令后,processCommand 函数对命令做完各种检查后,它就开始执行命令了。它会判断当前客户端是否有 CLIENT_MULTI 标记,如果有的话,就表明要处理的是 Redis 事务的相关命令,所以它会按照事务的要求,调用 queueMultiCommand 函数将命令入队保存,等待后续一起处理。而如果没有,processCommand 函数就会调用 call 函数来实际执行命令了。以下代码展示了这部分的逻辑,你可以看下。

    //如果客户端有CLIENT_MULTI标记,并且当前不是exec、discard、multi和watch命令
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);  //将命令入队保存,等待后续一起处理
        addReply(c,shared.queued);
    } else {
        call(c,CMD_CALL_FULL);  //调用call函数执行命令
        ...
    }

call 函数是在 server.c 文件中实现的,它执行命令是通过调用命令本身,即 redisCommand 结构体中定义的函数指针来完成的。这里,以 SET 命令为例来介绍下它的实际执行过程。

SET 命令对应的实现函数是 setCommand,这是在t_string.c文件中定义的。setCommand 函数首先会对命令参数进行判断,比如参数是否带有 NX、EX、XX、PX 等这类命令选项,如果有的话,setCommand 函数就会记录下这些标记。

然后,setCommand 函数会调用 setGenericCommand 函数,这个函数也是在 t_string.c 文件中实现的。setGenericCommand 函数会根据刚才 setCommand 函数记录的命令参数的标记,来进行相应处理。

如果 SET 命令可以正常执行的话,也就是说命令带有 NX 选项但是 key 并不存在,或者带有 XX 选项但是 key 已经存在,这样 setGenericCommand 函数就会调用 setKey 函数(在 db.c 文件中)来完成键值对的实际插入,如下所示:

setKey(c->db,key,val);

然后,如果命令设置了过期时间,setGenericCommand 函数还会调用 setExpire 函数设置过期时间。最后,setGenericCommand 函数会调用 addReply 函数,将结果返回给客户端,如下所示:

addReply(c, ok_reply ? ok_reply : shared.ok);

无论是在命令执行的过程中,发现不符合命令的执行条件,或是命令能成功执行,addReply 函数都会被调用,用来返回结果。所以,这就进入到命令处理过程的最后一个阶段:结果返回阶段。

2.4. 结果返回

addReply 函数是在 networking.c 文件中定义的。它的执行逻辑比较简单,主要是调用 prepareClientToWrite 函数,并在 prepareClientToWrite 函数中调用 clientInstallWriteHandler 函数,将待写回客户端加入到全局变量 server 的 clients_pending_write 列表中。然后,addReply 函数会调用 _addReplyToBuffer 等函数(在 networking.c 中),将要返回的结果添加到客户端的输出缓冲区中。

/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyProtoToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

3. 总结

Redis通过多路复用机制来实现网络通信,同时也基于多路复用机制封装了事件驱动框架,这篇文章抛开了事件驱动框架的部分(后续会单独整理),主要是为了能够更加清晰的表述一条命令的处理过程,2.4中只是把命令的执行结果写到输出缓冲区中,并没有真正写回给客户端,而真正写回客户端的操作是在beforeSleep函数中实现的,后续分析Redis 6.0新推出的多I/O线程实现原理时候,再详细介绍这一部分吧。

然后我这里呢,结合前面两部分,用伪代码再描述下整个流程,有两个函数之前没有提过,beforeSleep和aftersleep,先暂时忽略好了:

//创建epoll实例
epfd = epoll_create(EPOLL_SIZE); 

//将监听socket加入到监听列表中
epoll_ctl(epfd, EPOLL_CTL_ADD, socket_listen, &ee); 

//循环监听发生的I/O事件
while (1) {
   //这个函数的作用后面分析多I/O线程的时候再介绍吧
   beforeSleep();
   //等待返回已经就绪的描述符 
   n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1); 
   //这个函数的作用后面分析多I/O线程的时候再介绍吧
   aftersleep();
   
   //遍历所有就绪的描述符     
   for (int i = 0; i < n; i++) {
  
       //如果是监听socket描述符就绪,表明有一个新客户端连接到来 
       if (ep_events[i].data.fd == socket_listen) { 
          //调用accept()建立连接,实际上是通过acceptTcpHandler函数间接调用的
          socket_connection = accept(sock_fd);
          
          //注册已连接socket
          epoll_ctl(epfd, EPOLL_CTL_ADD, socket_connection, &ee); 
       } else { 
          //如果是已连接socket,则开始一条命令的处理过程
          //1. 读取命令
          readQueryFromClient(socket_connection);
          //2. 解析命令
          processInputBuffer();
          //3. 执行命令
          processCommand();
          //4. 返回结果
          addReply();
       }
   }
   
}

参考资料:

  1. 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
  2. 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
  3. Redis 6.0源码:https://github.com/redis/redis/tree/6.0/src

相关文章

网友评论

      本文标题:一条Redis命令的执行过程

      本文链接:https://www.haomeiwen.com/subject/ddwssrtx.html