本文分析的Redis源码是基于Redis 6.0的,Redis 6.0之前的代码会稍微有些出入,但是整体流程大致类似。主要是为了后续分析Redis 6.0推出的多I/O线程实现做基础,而分析Redis命令的执行过程,需要先了解下Redis的事件驱动框架的实现,这篇文档在公司电脑上,后面疫情解封之后,到公司再贴出来吧。
1. 网络通信模型
Redis使用多路复用机制来实现对网络请求的处理,他会根据Redis运行的操作系统,选择相应的多路复用机制,比如在Linux上就会选择使用epoll、在windows上就会选择使用select。Redis的事件驱动框架对多路复用机制做了一层封装,这里先抛开事件驱动框架,分析Redis网络通信模型,下面以epoll为例。
首先介绍下epoll机制中,几个比较重要的函数:
- epoll_create:epoll_create函数会创建一个epoll实例,而epoll实例中记录了监听的文件描述符以及已经就绪的文件描述符。
- epoll_ctl:向epoll实例中注册一个需要被监听的socket(包括监听socket和已连接socket),监听socket用于处理客户端的连接事件进而建立已连接socket,而已连接socket用于处理客户端的读、写事件。
- epoll_wait:等待epoll实例监听的socket上的I/O事件发生。在调用epoll_wait函数时,可以传入一个timeout参数,如果传入-1,epoll_wait无限期阻塞,直到有I/O事件发生,否则在指定超时时间返回。Redis在调用epoll_wait时,会传入-1,就是说如果没有I/O事件发生时,Redis主线程会一直阻塞下去。
这里需要简单说一下就是,如果只监听了一个端口的话,那么我们的应用程序中只会有一个监听socket,每当有客户端连接进来的时候,就会创建一个已连接socket,所以我们的应用程序会存在多个已连接socket。
如果使用epoll来实现网络编程,通常会实现类似下面这样的模型:
int sock_fd,conn_fd; //监听套接字和已连接套接字的变量
sock_fd = socket() //创建套接字
bind(sock_fd) //绑定套接字
listen(sock_fd) //在套接字上进行监听,将套接字转为监听套接字
epfd = epoll_create(EPOLL_SIZE); //创建epoll实例,
//创建epoll_event结构体数组,保存套接字对应文件描述符和监听事件类型
ep_events = (epoll_event*)malloc(sizeof(epoll_event) * EPOLL_SIZE);
//创建epoll_event变量
struct epoll_event ee
//监听读事件
ee.events = EPOLLIN;
//监听的文件描述符是刚创建的监听套接字
ee.data.fd = sock_fd;
//将监听套接字加入到监听列表中
epoll_ctl(epfd, EPOLL_CTL_ADD, sock_fd, &ee);
while (1) {
//等待返回已经就绪的描述符
n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
//遍历所有就绪的描述符
for (int i = 0; i < n; i++) {
//如果是监听套接字描述符就绪,表明有一个新客户端连接到来
if (ep_events[i].data.fd == sock_fd) {
conn_fd = accept(sock_fd); //调用accept()建立连接
ee.events = EPOLLIN;
ee.data.fd = conn_fd;
//添加对新创建的已连接套接字描述符的监听,监听后续在已连接套接字上的读事件
epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ee);
} else { //如果是已连接套接字描述符就绪,则可以读数据
...//读取数据并处理
}
}
}
首先创建监听socket并调用epoll_create函数创建一个epoll实例,然后调用epoll_ctl把监听socket注册到epoll实例上,在一个死循环里面调用epoll_wait等待epoll上的socket有事件发生。
epoll_wait返回后,根据有事件发生的socket是监听socket还是已连接socket,来执行相应逻辑:
- 如果是监听socket,说明有新的客户端来建立连接,创建已连接socket,并把已连接socket注册到epoll实例上。
- 如果是已连接socket,说明可以读取客户端的请求,则执行相应的解析、处理请求并写回客户端操作。
这个过程如下图所示:
1.png
2. 一条命令的处理过程
源码地址:https://github.com/redis/redis/tree/6.0/src
Redis的启动main函数是在server.c文件中,在初始化的时候,会通过事件驱动框架相关函数调用epoll_create函数来创建epoll实例。然后通过事件驱动框架的aeCreateFileEvent函数,调用epoll_ctl函数向epoll实例注册监听socket,并传入一个回调函数acceptTcpHandler,当监听socket有客户端建立连接时,就会触发acceptTcpHandler函数建立连接。
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
而在acceptTcpHandler函数中,会创建已连接socket并注册回调函数readQueryFromClient,当已连接socket有事件发生之后,就会触发readQueryFromClient函数。其实这中间是有很多其他函数调用的,我这里就不展开说了。
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
命令的处理过程也就是从readQueryFromClient函数开始,分为四个阶段:
- 命令读取,对应 readQueryFromClient 函数;
- 命令解析,对应 processInputBufferAndReplicate 函数;
- 命令执行,对应 processCommand 函数;
- 结果返回,对应 addReply 函数;
2.1. 命令读取
readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。
紧接着,readQueryFromClient 函数会根据读取数据的情况,进行一些异常处理,比如数据读取失败或是客户端连接关闭等。此外,如果当前客户端是主从复制中的主节点,readQueryFromClient 函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。
最后,readQueryFromClient 函数会调用 processInputBuffer 函数,这就进入到了命令处理的下一个阶段,也就是命令解析阶段。
void readQueryFromClient(connection *conn) {
......
//从客户端socket中读取的数据长度,默认为16KB
readlen = PROTO_IOBUF_LEN;
......
//给缓冲区分配空间
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//调用read从描述符为fd的客户端socket中读取数据
nread = connRead(c->conn, c->querybuf+qblen, readlen);
//调用processInputBuffer进一步处理读取内容
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
}
2.2. 命令解析
processInputBuffer(在 networking.c 文件中)函数,对客户端输入缓冲区中的命令和参数进行解析。下面我们接单分析下这个函数。
首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“”开头*。
- 如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。
- 如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于 PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用 processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。
这样,等命令解析完成后,processInputBuffer 函数就会调用 processCommandAndResetClient函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。下面的代码展示了 processInputBuffer 函数解析命令时的主要流程,你可以看下。
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
...
/* Determine request type when unknown. */
if (!c->reqtype) {
//根据客户端输入缓冲区的命令开头字符判断命令类型
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK; //符合RESP协议的命令
} else {
c->reqtype = PROTO_REQ_INLINE; //管道类型命令
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break; //对于管道类型命令,调用processInlineBuffer函数解析
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break; //对于RESP协议命令,调用processMultibulkBuffer函数解析
}
...
if (c->argc == 0) {
resetClient(c);
} else {
//这里还有一些多I/O线程的相关操作
......
//调用processCommandAndResetClient函数,开始执行命令
if (processCommandAndResetClient(c) == C_ERR) {
... }
... }
}
...
}
2.3. 命令执行
processCommandAndResetClient函数会调用processCommand(在server.c文件中)函数来执行命令
int processCommandAndResetClient(client *c) {
......
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
......
return deadclient ? C_ERR : C_OK;
}
而processCommand函数在实际执行命令前的主要逻辑可以分成三步:
- 第一步,processCommand 函数会调用 moduleCallCommandFilters 函数(在module.c文件),将 Redis 命令替换成 module 中想要替换的命令。
- 第二步,processCommand 函数会判断当前命令是否为 quit 命令,并进行相应处理。
- 第三步,processCommand 函数会调用 lookupCommand 函数,在全局变量 server 的 commands 成员变量中查找相关的命令。
这里,全局变量 server 的 commands 成员变量是一个哈希表,它的定义是在server.h文件中的 redisServer 结构体里面,如下所示:
struct redisServer {
...
dict *commands;
...
}
另外,commands 成员变量的初始化是在Redis初始化时,完成哈希表创建,再Redis 提供的命令名称和对应的实现函数,插入到哈希表中。
插入哈希表时,会用把redisCommandTable数组中的内容,填充到哈希表中,redisCommandTable数组在server.c中,比如,以下代码展示了 GET 和 SET 这两条命令的信息,它们各自的实现函数分别是 getCommand 和 setCommand。当然,如果你想进一步了解 redisCommand 结构体,也可以去看下它的定义:
struct redisCommand redisCommandTable[] = {
...
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
...
}
lookupCommand 函数会根据解析的命令名称,在 commands 对应的哈希表中查找相应的命令。一旦查到对应命令后,processCommand 函数对命令做完各种检查后,它就开始执行命令了。它会判断当前客户端是否有 CLIENT_MULTI 标记,如果有的话,就表明要处理的是 Redis 事务的相关命令,所以它会按照事务的要求,调用 queueMultiCommand 函数将命令入队保存,等待后续一起处理。而如果没有,processCommand 函数就会调用 call 函数来实际执行命令了。以下代码展示了这部分的逻辑,你可以看下。
//如果客户端有CLIENT_MULTI标记,并且当前不是exec、discard、multi和watch命令
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c); //将命令入队保存,等待后续一起处理
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL); //调用call函数执行命令
...
}
call 函数是在 server.c 文件中实现的,它执行命令是通过调用命令本身,即 redisCommand 结构体中定义的函数指针来完成的。这里,以 SET 命令为例来介绍下它的实际执行过程。
SET 命令对应的实现函数是 setCommand,这是在t_string.c文件中定义的。setCommand 函数首先会对命令参数进行判断,比如参数是否带有 NX、EX、XX、PX 等这类命令选项,如果有的话,setCommand 函数就会记录下这些标记。
然后,setCommand 函数会调用 setGenericCommand 函数,这个函数也是在 t_string.c 文件中实现的。setGenericCommand 函数会根据刚才 setCommand 函数记录的命令参数的标记,来进行相应处理。
如果 SET 命令可以正常执行的话,也就是说命令带有 NX 选项但是 key 并不存在,或者带有 XX 选项但是 key 已经存在,这样 setGenericCommand 函数就会调用 setKey 函数(在 db.c 文件中)来完成键值对的实际插入,如下所示:
setKey(c->db,key,val);
然后,如果命令设置了过期时间,setGenericCommand 函数还会调用 setExpire 函数设置过期时间。最后,setGenericCommand 函数会调用 addReply 函数,将结果返回给客户端,如下所示:
addReply(c, ok_reply ? ok_reply : shared.ok);
无论是在命令执行的过程中,发现不符合命令的执行条件,或是命令能成功执行,addReply 函数都会被调用,用来返回结果。所以,这就进入到命令处理过程的最后一个阶段:结果返回阶段。
2.4. 结果返回
addReply 函数是在 networking.c 文件中定义的。它的执行逻辑比较简单,主要是调用 prepareClientToWrite 函数,并在 prepareClientToWrite 函数中调用 clientInstallWriteHandler 函数,将待写回客户端加入到全局变量 server 的 clients_pending_write 列表中。然后,addReply 函数会调用 _addReplyToBuffer 等函数(在 networking.c 中),将要返回的结果添加到客户端的输出缓冲区中。
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
3. 总结
Redis通过多路复用机制来实现网络通信,同时也基于多路复用机制封装了事件驱动框架,这篇文章抛开了事件驱动框架的部分(后续会单独整理),主要是为了能够更加清晰的表述一条命令的处理过程,2.4中只是把命令的执行结果写到输出缓冲区中,并没有真正写回给客户端,而真正写回客户端的操作是在beforeSleep函数中实现的,后续分析Redis 6.0新推出的多I/O线程实现原理时候,再详细介绍这一部分吧。
然后我这里呢,结合前面两部分,用伪代码再描述下整个流程,有两个函数之前没有提过,beforeSleep和aftersleep,先暂时忽略好了:
//创建epoll实例
epfd = epoll_create(EPOLL_SIZE);
//将监听socket加入到监听列表中
epoll_ctl(epfd, EPOLL_CTL_ADD, socket_listen, &ee);
//循环监听发生的I/O事件
while (1) {
//这个函数的作用后面分析多I/O线程的时候再介绍吧
beforeSleep();
//等待返回已经就绪的描述符
n = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
//这个函数的作用后面分析多I/O线程的时候再介绍吧
aftersleep();
//遍历所有就绪的描述符
for (int i = 0; i < n; i++) {
//如果是监听socket描述符就绪,表明有一个新客户端连接到来
if (ep_events[i].data.fd == socket_listen) {
//调用accept()建立连接,实际上是通过acceptTcpHandler函数间接调用的
socket_connection = accept(sock_fd);
//注册已连接socket
epoll_ctl(epfd, EPOLL_CTL_ADD, socket_connection, &ee);
} else {
//如果是已连接socket,则开始一条命令的处理过程
//1. 读取命令
readQueryFromClient(socket_connection);
//2. 解析命令
processInputBuffer();
//3. 执行命令
processCommand();
//4. 返回结果
addReply();
}
}
}
参考资料:
- 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
- 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
- Redis 6.0源码:https://github.com/redis/redis/tree/6.0/src
网友评论