在 2020 年 5 月推出的 Redis 6.0 版本中,Redis 在执行模型中还进一步使用了多线程来处理 IO 任务。之前在:https://www.jianshu.com/p/0323fc06a36f 简单讨论过Redis执行命令的过程大致分为:读取命令、解析命令、执行命令、返回结果四个阶段。而多线程处理 IO 任务的目的,就是为了充分利用当前服务器的多核特性,使用多核运行多线程,让多线程帮助加速命令读取、命令解析以及数据写回的速度,提升 Redis 整体性能。
源码地址:https://github.com/redis/redis/tree/6.0/src
1. 基本步骤
1.1. 输入、输出缓冲区
为了避免客户端和服务器端的请求发送和处理速度不匹配,服务器端给每个连接的客户端都设置了一个输入缓冲区和输出缓冲区,我们称之为客户端输入缓冲区和输出缓冲区。
输入缓冲区会先把客户端发送过来的命令暂存起来,Redis 主线程再从输入缓冲区中读取命令,进行处理。当 Redis 主线程处理完数据后,会把结果写入到输出缓冲区,再通过输出缓冲区返回给客户端,如下图所示:
1.png
1.2. 多线程处理网络请求
引入多I/O线程优化之后,为了避免多线程访问共享资源造成的线程安全问题,执行命令阶段仍然是在主线程中执行的,而I/O线程只是在读取客户端请求、解析命令、将命令执行结果返回给客户端的时候起作用。而且,在同一时刻,读写客户端命令操作和执行命令操作只有一种在运行。下面用一张表格来简单描述下这个过程:
主线程 | I/O线程 | |
---|---|---|
T1 | 接收客户端连接,建立连接socket | |
T2 | 把连接socket分配给I/O线程 | |
T3 | 等待I/O线程读取、解析命令 | |
T4 | 读取命令 | |
T5 | 解析命令 | |
T6 | 执行命令 | |
T7 | 将结果写到输出缓冲区 | |
T8 | 等待I/O线程写回客户端 | |
T9 | 将缓冲区数据写回客户端 | |
T10 | I/O线程写回客户端完成,等待后续请求 |
2. 源码解析
2.1. 数据结构
在Redis中,全局变量都会保存在redisServer结构体类型的变量server中(在server.h文件中),我们的RDB、AOF、主从等配置都是在这个server变量中保存的,而多I/O线程中有一个全局变量io_threads_active,来表示Redis是否开启了多I/O线程:
- server.io_threads_active = 0,未启动多I/O线程。
- server.io_threads_active = 1,启动多I/O线程。
server中还有一个变量io_threads_num保存I/O线程的数量。
server中有两个 List 类型的成员变量:clients_pending_write 和 clients_pending_read,它们分别记录了待写回数据的客户端和待读取数据的客户端,如下所示:
struct redisServer {
...
list *clients_pending_write; //待写回数据的客户端
list *clients_pending_read; //待读取数据的客户端
...
}
在networking.c文件中,定义了四个数组,用来保存多I/O线程的相关数据:
- io_threads_list 数组:每个元素是一个List类型的列表,列表保存了每个线程待处理的客户端,比如io_threads_list[0]保存了0号线程要处理的客户端列表;
- io_threads_pending 数组:保存等待每个 IO 线程处理的客户端个数;
- io_threads_mutex 数组:保存线程互斥锁,可以对标Java的ReentrantLock,到后面会介绍这个数组的作用;
- io_threads 数组:保存每个 IO 线程。
pthread_t io_threads[IO_THREADS_MAX_NUM]; //记录线程描述符的数组
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; //记录线程互斥锁的数组
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; //记录线程待处理的客户端个数
list *io_threads_list[IO_THREADS_MAX_NUM]; //记录线程对应处理的客户端
networking.c还定义了一个int型的变量io_threads_op,表示I/O线程当前要执行的操作是读操作还是写操作,这个变量也表明所有的I/O线程要么都在读,要么都在写,不可能一部分I/O线程在读,一部分I/O线程在写,该变量有两个值:
- IO_THREADS_OP_WRITE:这表明该 IO 线程要做的是写操作。
- IO_THREADS_OP_READ:这表明该 IO 线程要做的是读操作。
每一次客户端连接请求进来的时候,redis都会为这个客户端创建一个client变量,而client有一个属性flags,flags标记了当前客户端的状态,这次仅分析和多I/O线程有关的三个状态:
- CLIENT_PENDING_READ:有命令等待被读取
- CLIENT_PENDING_WRITE:等待被写回
- CLIENT_PENDING_COMMAND:命令已经被解析,等待被执行
2.2. 初始化
redis启动的main函数(在server.c文件中)会执行server的初始化过程,server 在初始化过程的最后,调用 InitSeverLast 函数,而 InitServerLast 函数再进一步调用 initThreadedIO 函数(在networking.c文件中)来完成多I/O线程的初始化操作。具体如下所示:
void InitServerLast() {
bioInit();
initThreadedIO(); //调用initThreadedIO函数初始化IO线程
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
下面来看下initThreadedIO函数的主要执行流程,他主要分为3步:
- 设置激活I/O线程的标志为未启动,即server.io_threads_active = 0。
- 用两个if来判断是否推出方法:设置的线程数是否为1;设置的线程数是否超过最大阈值。
- 初始化I/O线程,即上面提到的networking.c文件中定义的有关I.O线程的数据结构。
具体如下:
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
//1. 设置激活I/O线程的标志为未启动
server.io_threads_active = 0; /* We start with threads not active. */
//2. 设置的线程数是否为1
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
//2. 设置的线程数是否超过最大阈值
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
//3.初始化I/O线程
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
在初始化I/O线程的时候,会调用pthread_create函数来创建I/O线程,pthread_create可以对标Java的 new Thread(runnable),然后IOThreadMain函数就是I/O线程执行的主函数,I/O线程获取客户端列表,然后根据当前I/O线程的操作类型,来执行读取命令或写回客户端操作。IOThreadMain还有一些线程同步的操作等到后面再讨论。
void *IOThreadMain(void *myid) {
......
while(1) {
......
//获取IO线程要处理的客户端列表
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
//如果线程操作是写操作,则将数据写回客户端
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
//如果线程操作是读操作,则从客户端读取数据
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
2.3. 读取命令
在https://www.jianshu.com/p/0323fc06a36f分析命令执行过程中有提到,redis是在readQueryFromClient函数(networking.c文件)中执行命令读取操作的,而在readQueryFromClient函数一开始,会判断是否开启了I/O线程,
void readQueryFromClient(connection *conn) {
......
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
}
而在postponeClientRead函数中,有五个判断条件,分别是:
- I/O线程被激活,即server.io_threads_active = 1,在2.2分析初始化的时候,io_threads_active 是被设置为0的,而具体什么时候会把他置为1,会在2.5分析线程间同步的时候再讨论。
- I/O线程可以用于读取命令,这个变量值是在 Redis 配置文件 redis.conf 中,通过配置项 io-threads-do-reads 设置的,默认值为 no,如果想用多 IO 线程处理客户端读操作,就需要把 io-threads-do-reads 配置项设为 yes。
- 客户端没有被暂停。
- processEventsWhileBlokced 函数没有在执行。
- 客户端现有标识不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ,前面两个常量是关于主从的,这里先不讨论,CLIENT_PENDING_READ前面有提到过。
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!clientsArePaused() &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
如果五个条件都满足的话,就把客户端的状态设置为CLIENT_PENDING_READ,并把当前客户端添加到server.clients_pending_read列表中。
也就是说当Redis接收到客户端请求时,如果I/O线程被激活,并不会直接读取命令,而是把client记为CLIENT_PENDING_READ,并把client添加到“等待被读取客户端”clients_pending_read列表中就直接返回了。这个操作是在主线程中进行的。
分析Redis命令处理过程中有提到,Redis会在一个循环中接收客户端请求,在阻塞等待客户端请求到来之前,会调用beforesleep函数,进而调用handleClientsWithPendingReadsUsingThreads函数来处理等待被读取的客户端,该函数主要逻辑分为四步:
- 判定 IO 线程是否激活,以及用户是否设置了 Redis 可以用 IO 线程处理待读客户端,如果不满足条件直接返回,这两个判断和postponeClientRead函数中的第1、2个条件相同。
- 取出等待被读取的客户端,以轮询的方式分配给各个I/O线程。
- 主线程把自己该读取的客户端中的命令,先读取、解析完。这里要说明的一点就是主线程就是0号线程也会参与I/O操作,并且读取的是io_threads_list[0]中的元素。
- 当I/O线程读取、解析完成之后,执行server.clients_pending_read列表中所有的客户端命令。
int handleClientsWithPendingReadsUsingThreads(void) {
//1. 判定 IO 线程是否激活,以及用户是否设置了 Redis 可以用 IO 线程处理待读客户端
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
......
//2. 取出等待被读取的客户端,以轮询的方式分配给各个I/O线程。
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
......
//3.主线程把自己该读取的客户端中的命令,先读取、解析完
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
//等待I/O线程解析完所有的命令
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
//4.执行所有的命令
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
......
//执行所有命令
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
......
}
......
return processed;
}
这里再重新说一下readQueryFromClient函数,刚开始该函数只是把客户端添加到全局列表中,并标记client.flags 为 CLIENT_PENDING_READ,就直接返回了。这个时候再进入这个函数,就会进入后面的读取逻辑,进而调用processInputBuffer解析命令。这一部分逻辑可以参考https://www.jianshu.com/p/0323fc06a36f。
而processInputBuffer函数的最后,会判断当前客户端client.flags是否为CLIENT_PENDING_READ,如果是的话,就把该客户端标识设置为CLIENT_PENDING_COMMAND,即解析完客户端的命令,就直接返回了,并没有执行客户端的命令。
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
2.4. 写回客户端
在2.3中分析的handleClientsWithPendingReadsUsingThreads函数中,当所有客户端的命令都解析完成以后,循环遍历所有的客户端,调用processPendingCommandsAndResetClient函数执行客户端命令,而processPendingCommandsAndResetClient会进而调用processCommand(在server.c文件中)函数来处理客户端命令,处理完客户端命令后,调用addReply函数(在networking.c文件中)写回客户端。
而addReply函数一开始,会调用 prepareClientToWrite 函数,来判断是否使用I/O线程来写回客户端:
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
...
}
prepareClientToWrite函数前面一些主从的判断这里先忽略,在最后会调用 clientHasPendingReplies 函数,判断当前客户端是否还有留存在输出缓冲区中的数据等待写回。在前面2.3最后解析完客户端命令之后,就会把client.flags设置为CLIENT_PENDING_COMMAND,所以第二个条件也会满足
如果没有的话,那么,prepareClientToWrite 就会调用 clientInstallWriteHandler 函数,再进一步判断能否推迟该客户端写操作。
int prepareClientToWrite(client *c) {
......
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
}
clientInstallWriteHandler函数中,忽略掉主从相关判断,就判断c->flags 是否为 CLIENT_PENDING_WRITE,而这个时候client.flags为CLIENT_PENDING_COMMAND,满足条件。把c->flags设置为CLIENT_PENDING_WRITE,并把当前client添加到全局变量server.clients_pending_write中,就返回了。
void clientInstallWriteHandler(client *c) {
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
prepareClientToWrite函数返回之后,addReply函数会进一步把写回给客户端的数据写到输出缓冲区中,这时候并没有真正的把数据写回给客户端。
在2.3中提到Redis在循环接收客户端请求时,会调用beforesleep函数会调用handleClientsWithPendingReadsUsingThreads函数读取、解析客户端请求并执行客户端命令,然后beforeSleep函数会调用handleClientsWithPendingWritesUsingThreads函数来把缓冲区中的数据写回客户端。
handleClientsWithPendingWritesUsingThreads函数的执行逻辑大致可以分为四步:
- 判断是否需要使用I/O线程来写回客户端。
- 把待写客户端,按照轮询方式分配给 I/O 线程,添加到 io_threads_list 数组各元素中。
- 主 I/O 线程处理其待写客户端,并执行 while(1) 循环等待所有 I/O 线程完成处理。
- 再次检查是否还有缓冲区的数据未被写回客户端。这里是通过事件驱动框架来将缓冲区的数据写回客户端的,具体就不详细讨论了。
int handleClientsWithPendingWritesUsingThreads(void) {
......
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
//1. 判断是否需要使用I/O线程来写回客户端。
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
......
//2. 把待写客户端,按照轮询方式分配给 IO 线程
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
......
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//3. 主I/O线程处理其待写客户端
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//等待所有 I/O 线程完成处理
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
//4.再次检查是否还有缓冲区的数据未被写回客户端
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
......
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
2.5. 线程间同步
2.5.1 启动I/O线程
在前面2.1分析数据结构的时候,有一个互斥锁数组io_threads_mutex,数组里面的每一个元素就是一把I/O线程对应的互斥锁。
在2.2初始化I/O线程的时候,initThreadedIO函数中,循环创建io_threads_num个线程,在真正调用pthread_create创建线程之前,主线程会先调用pthread_mutex_lock函数,获取到这个I/O线程对应的锁,具体代码如下:
void initThreadedIO(void) {
......
//3.初始化I/O线程
for (int i = 0; i < server.io_threads_num; i++) {
......
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
......
}
io_threads[i] = tid;
}
}
而I/O线程执行的主函数中,首先会循环100w次,来等待I/O线程任务的到来,如果循环100w次还没有任务到来的时候,就会调用pthread_mutex_lock获取该I/O线程的互斥锁,而这把互斥锁在该线程创建之前,已经被主线程拿到了,所以I/O线程就阻塞在这里,等待被唤醒。就算I/O线程获取到互斥锁,也会立刻释放掉,让主线程可以随时停止I/O线程。
void *IOThreadMain(void *myid) {
......
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
}
}
在2.4写回客户端时候,handleClientsWithPendingWritesUsingThreads函数首先会进行两个判断,如下所示:
int handleClientsWithPendingWritesUsingThreads(void) {
......
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();
}
第一个if判断:如果设置的I/O线程数量为1(io_threads_num通过redis.conf配置),并且有必要停止I/O线程的话,就使用单线程的方式执行写回操作。
而stopThreadedIOIfNeeded会判断当前等待被处理的客户端数量pending,是否小于server.io_threads_num*2,如果小于的话,就会停止I/O线程,停止I/O线程同样是主线程获取该I/O线程的互斥锁。
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
void stopThreadedIO(void) {
......
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
server.io_threads_active = 0;
}
第二个if判断:判断server.io_threads_active的值,由于在初始化的时候,该值是被设为0的,并且走到这个判断,就说明客户端数量pending > server.io_threads_num*2,这个时候会调用startThreadedIO函数来启动I/O线程,startThreadxianedIO函数里面,其实就是释放掉每个I/O线程的互斥锁,这样I/O线程就可以获取到互斥锁继续执行了。
void startThreadxianedIO(void) {
......
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
}
2.5.2 写回客户端同步
在前面2.4已经介绍过写回客户端函数handleClientsWithPendingWritesUsingThreads的执行逻辑,他是在主线程中执行的,而在2.2初始化时候也讨论过I/O线程执行的主函数IOThreadMain的主要逻辑。
在这两个线程的执行方法中,有两个变量是都会被这两个线程进行读写操作的,io_threads_list和io_threads_pending,io_threads_list存放每个I/O线程需要处理的客户端列表,io_threads_pending保存每个I/O线程待处理客户端的数量。
下面通过一张表格来分析主线程和I/O线程是否会出现线程安全问题:
3.png
下面简单分析下上面这个表格:
- T1、T2时刻对应了I/O线程的创建,即2.5.1讨论的过程
- T3、T4时刻,在I/O线程被启动之后,主线程首先会给io_threads_list添加client,而此时I/O线程在循环判断io_threads_pending是否等于0。
- T5时刻主线程设置了I/O线程的操作类型为写操作。
- T6时刻主线程设置了I/O线程需要处理的客户端数量之后,I/O线程才开始继续执行while循环里面后续操作。
- T7时刻主线程循环判断io_threads_pending是否等于0,而这时I/O线程在写回客户端。
- T8、T9时刻I/O线程清空客户端列表,并设置io_threads_pending = 0,主线程才退出循环继续执行后续操作。
分析读取客户端命令的同步操作,其实和上面本小节分析的差不多,因此不再累述了,如果有问题的话,可以留言一起讨论。
虽然两个线程都有对io_threads_list和io_threads_pending变量的读、写操作,但是在同一时刻,只有一个线程在写其中的一个变量,因此并不会出现线程安全问题,真是艺高人胆大啊,希望有朝一日我也能写出如此骚气的代码,继续加油吧。
参考资料:
- 极客时间专栏《Redis源码剖析与实战》.蒋德钧.2021
- 极客时间专栏《Redis核心技术与实战》.蒋德钧.2020
- Redis 6.0源码:https://github.com/redis/redis/tree/6.0/src
网友评论