美文网首页一些收藏redis
Redis单线程模型,从源码角度解析

Redis单线程模型,从源码角度解析

作者: 史啸天 | 来源:发表于2022-01-24 15:06 被阅读0次

简述

大家都知道Redis是单线程的内存数据库,数据存储在内存中,且使用的是单线程模型,因此速度极快;今天我们就来从源码的角度分析下Redis的单线程模型;
首先,我们这里使用的是Redis2.9.11版本的源码进行分析,Redis是基于C语言编写的,语法结构与Java不太一样,但Redis的代码足够精简,阅读起来还是比较容易的,这里我也添加了一些注释,github上有大神针对Redis源码的整个注释,大家也可以下载下来自己参考阅读一下。
Redis源码注释版:https://github.com/huangz1990/redis-3.0-annotated

主函数入口

我们先从main函数开始,在redis.c文件中,我们可以看到Redis启动的main函数,大概罗列做了以下几件事:

  • 检查是否是Sentinel模式启动;
  • 初始化服务器配置,在这里面初始化了LRU时间、RDB保存条件、命令、客户端输出缓冲区大小、慢查询日志等等;
  • 检查并加载配置文件和启动命令;
  • 初始化服务器,这里主要初始化的是Redis存储的数据结构,还进行了时间事件创建、打开TCP端口并监听端口、为TCP连接关联应答(accept)处理器(这里就是所谓的I/O多路复用器)、初始化Lua脚本系统、初始化慢查询功能、初始化后台持久化的线程(这里是主函数中唯一使用到多线程的地方);
  • 从AOF或者RDB中加载数据;
  • 检查内存配置,maxmemory配置项是否正确;
  • 运行事件处理器,以上执行完成,代表着服务器启动成功,在这个函数中是Redis工作的函数,我们下面展开讲一下;
  • 停止事件循环;
  • 返回0;
    这里我把源码贴到下面,大家可以看一下注释
int main(int argc, char **argv) {
    struct timeval tv;

    /* We need to initialize our libraries, and the server configuration. */
    // 初始化库
#ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
#endif
    setlocale(LC_COLLATE,"");
    zmalloc_enable_thread_safeness();
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    srand(time(NULL)^getpid());
    gettimeofday(&tv,NULL);
    dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());

    // 检查服务器是否以 Sentinel 模式启动
    server.sentinel_mode = checkForSentinelMode(argc,argv);

    // 初始化服务器
    initServerConfig();

    /* We need to init sentinel right now as parsing the configuration file
     * in sentinel mode will have the effect of populating the sentinel
     * data structures with master nodes to monitor. */
    // 如果服务器以 Sentinel 模式启动,那么进行 Sentinel 功能相关的初始化
    // 并为要监视的主服务器创建一些相应的数据结构
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }

    // 检查用户是否指定了配置文件,或者配置选项
    if (argc >= 2) {
        int j = 1; /* First option to parse in argv[] */
        sds options = sdsempty();
        char *configfile = NULL;

        /* Handle special options --help and --version */
        // 处理特殊选项 -h 、-v 和 --test-memory
        if (strcmp(argv[1], "-v") == 0 ||
            strcmp(argv[1], "--version") == 0) version();
        if (strcmp(argv[1], "--help") == 0 ||
            strcmp(argv[1], "-h") == 0) usage();
        if (strcmp(argv[1], "--test-memory") == 0) {
            if (argc == 3) {
                memtest(atoi(argv[2]),50);
                exit(0);
            } else {
                fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
                fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
                exit(1);
            }
        }

        /* First argument is the config file name? */
        // 如果第一个参数(argv[1])不是以 "--" 开头
        // 那么它应该是一个配置文件
        if (argv[j][0] != '-' || argv[j][1] != '-')
            configfile = argv[j++];

        /* All the other options are parsed and conceptually appended to the
         * configuration file. For instance --port 6380 will generate the
         * string "port 6380\n" to be parsed after the actual file name
         * is parsed, if any. */
        // 对用户给定的其余选项进行分析,并将分析所得的字符串追加稍后载入的配置文件的内容之后
        // 比如 --port 6380 会被分析为 "port 6380\n"
        while(j != argc) {
            if (argv[j][0] == '-' && argv[j][1] == '-') {
                /* Option name */
                if (sdslen(options)) options = sdscat(options,"\n");
                options = sdscat(options,argv[j]+2);
                options = sdscat(options," ");
            } else {
                /* Option argument */
                options = sdscatrepr(options,argv[j],strlen(argv[j]));
                options = sdscat(options," ");
            }
            j++;
        }
        if (configfile) server.configfile = getAbsolutePath(configfile);
        // 重置保存条件
        resetServerSaveParams();

        // 载入配置文件, options 是前面分析出的给定选项
        loadServerConfig(configfile,options);
        sdsfree(options);

        // 获取配置文件的绝对路径
        if (configfile) server.configfile = getAbsolutePath(configfile);
    } else {
        redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
    }

    // 将服务器设置为守护进程
    if (server.daemonize) daemonize();

    // 创建并初始化服务器数据结构
    initServer();

    // 如果服务器是守护进程,那么创建 PID 文件
    if (server.daemonize) createPidFile();

    // 为服务器进程设置名字
    redisSetProcTitle(argv[0]);

    // 打印 ASCII LOGO
    redisAsciiArt();

    // 如果服务器不是运行在 SENTINEL 模式,那么执行以下代码
    if (!server.sentinel_mode) {
        /* Things not needed when running in Sentinel mode. */
        // 打印问候语
        redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
    #ifdef __linux__
        // 打印内存警告
        linuxOvercommitMemoryWarning();
    #endif
        // 从 AOF 文件或者 RDB 文件中载入数据
        loadDataFromDisk();
        // 启动集群?
        if (server.cluster_enabled) {
            if (verifyClusterConfigWithData() == REDIS_ERR) {
                redisLog(REDIS_WARNING,
                    "You can't have keys in a DB different than DB 0 when in "
                    "Cluster mode. Exiting.");
                exit(1);
            }
        }
        // 打印 TCP 端口
        if (server.ipfd_count > 0)
            redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
        // 打印本地套接字端口
        if (server.sofd > 0)
            redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
    } else {
        sentinelIsRunning();
    }

    /* Warning the user about suspicious maxmemory setting. */
    // 检查不正常的 maxmemory 配置
    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
        redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
    }

    // 运行事件处理器,一直到服务器关闭为止
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeMain(server.el);

    // 服务器关闭,停止事件循环
    aeDeleteEventLoop(server.el);

    return 0;
}

事件处理器

在Redis初始化完成后,会进入aeMain函数,这里是Redis主要的事件处理器,我们可以看下内部是如何做的;

/*
 * 事件处理器的主循环
 */
void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

我们可以看到aeMain内部是一个while的死循环,真正的事件处理又交给了aeProcessEvents函数,在aeProcessEvents函数中主要处理Redis的两种事件,即时间事件和文件事件;
大概罗列aeProcessEvents函数做以下几件事:

  • 获取即将到达的时间事件和已经达到的时间事件;
  • 处理文件事件,在Redis中所有的socket都被视为文件事件,在这里会获取所有的文件事件,循环处理读事件和写事件;
  • 开始处理到达的时间事件;
  • 返回已处理的文件/时间事件数量;
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }

            // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            
            // 执行到这一步,说明没有时间事件
            // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 文件事件可以阻塞直到有事件到达为止
                tvp = NULL; /* wait forever */
            }
        }

        // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    }

    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

总结

总结一下,从源码的角度分析,Redis的单线程模型,就是在Redis主函数启动后,没有再创建过新的线程;
Redis在处理文件事件是也就是读取socket缓冲区中的数据,并进行解析响应的;在这里Redis对于socket监听就是所谓I/O多路复用器,而Redis处理事件一直是基于一个while循环来处理的;

相关文章

  • Redis单线程模型,从源码角度解析

    简述 大家都知道Redis是单线程的内存数据库,数据存储在内存中,且使用的是单线程模型,因此速度极快;今天我们就来...

  • 2020-06-24

    redis IO模型 学习方法 应该从对比的角度去分析其特性; 本篇博客主要针对redis的单线程模型,epoll...

  • Redis学习之旅~原理篇

    内容依旧来自 核心原理 线程IO模型 单线程非阻塞IO redis是单线程模型。redis的...

  • java面试宝典 redis和分布式锁

    redis 是基于key-value的内存高速缓存数据库 redis 是单线程模型 redis 是单线程模型为什么...

  • Redis BigKey —— by 久米泷

    BigKey 的危害 redis 在删除 big key 时,会阻塞当前线程,然后 redis 又是单线程模型,从...

  • redis学习笔记(一)

    redis的特性 1.速度快: redis源码是使用C语言编写的,它将数据存在内存当中,线程模型是单线程 从上到下...

  • Redis 到底是单线程还是多线程?我要吊打面试官

    1、Redis 单线程到底指什么? 没错,大家所熟知的 Redis 确实是单线程模型,指的是执行 Redis 命令...

  • Redis高级应用总结

    Redis的线程模型 Redis是单进程单线程的,但是使用的是单线程非阻塞的多路IO复用的模型。多线程模型会导致线...

  • rxjava2理解

    本文从建立模型的角度分析rxjava2的源码实现,适合看了众多rxjava2源码解析还是一头雾水的同学,附带少量代...

  • Android事件分发机制完全解析

    Android事件分发机制完全解析,带你从源码的角度彻底理解(上)Android事件分发机制完全解析,带你从源码的...

网友评论

    本文标题:Redis单线程模型,从源码角度解析

    本文链接:https://www.haomeiwen.com/subject/agybhrtx.html