美文网首页一些收藏
读redis源码笔记1-线程模型

读redis源码笔记1-线程模型

作者: 多喝水JS | 来源:发表于2020-03-30 23:44 被阅读0次

    最近项目大量用到redis,在这之前对redis的认识仅仅停留在demo阶段。因此在使用过程走了很多弯路。所以利用下班时间简单过了一遍源码,记录下自己不太理解以及源码中实现优雅的地方,以后方便理解。

    注:redis源码版本是:redis-5.0.8

    1、redis线程模型

    redis底层大部分使用单线程来处理客户端的请求,少部分耗时的任务(比如rdb任务)fork一个线程来处理。
    这个在Java中是不可想象的,鼎鼎大名的Netty虽然也采用 Reactor模型来处理请求,虽然说同样可以一个线程处理ACCEPT、READ、WRITE请求。但如果处理上万以上的请求,单线程是无法胜任的。那redis为啥可以使用单线程,并且性能也很不错?
    以下是个人总结的:

    • redis的数据结构设计的很精妙。提供了压缩列表,快速列表,跳跃表等数据结构,这些数据结构在查询和存储方面都做了极致优化
    • redis是纯内存操作,这个比较容易理解。毕竟它是为缓存而生的。但少部分还需要涉及到磁盘IO,比如rdb
    • 单线程减少了锁的竞争以及上下文切换。锁是很慢的,在Java中,如果采用synchronized,在竞争的情况下,会有一个升级的过程,当升到重量级锁,线程将加入到同步队列中,堵塞,直到被唤醒。这里大大的影响的处理速度,这对于高并发的缓存系统来说是致命的。另外cpu不会一直被一个线程占用,在一些操作系统上会有运行时间限制,即运行这个线程一段时间,切换到另一个线程继续执行。其中这里的切换就是上下文切换,切换时需要保存当前线程的上下文信息,另需要进行系统调用,而系统调用则需要涉及到安全检查,用户态和内核态数据拷贝等,一来二去,性能消耗也是不少的。
    • redis采用了 Reactor线程模型。底层默认使用Epoll多路复用IO库实现。

    从上面可以看出,redis之所以单线程又这么快,无非是纯内存操作加数据结构设计的很合理。Netty却不一样,涉及到大量的业务代码,这部分Netty无法掌控(虽然说Netty也提供了ioRatio来调整任务执行时间,但无法控制processSelectedKeys里的任务执行时间),单线程很容易卡死。

    源码分析

    先引用一张网络上的图


    总体意思是:I/O多路复用程序负责监听多个套接字,并向文件事件分派器传送那些产生了事件的套接字。 尽管多个文件事件可能会并发地出现,但I/O多路复用程序总是会将所有产生事 件的套接字都放到一个队列里面,然后通过这个队列,文件事件分派器有序的交给各个处理器处理

    1、启动
    //ae.c
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            //在事件处理前执行一些操作,如刷新aof文件
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
          //处理所有已到达的时间事件,以及所有已就绪的文件事件
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    

    当redis服务器启动以后,会调用ae.c#aeMain方法。这个方法是个死循环,只要服务器不停止。
    主要做两件事:1、在事件处理前执行一些操作,如刷新aof文件、执行一次快速的主动过期检查
    2、处理所有已到达的时间事件,以及所有已就绪的文件事件

    2、处理所有已到达的时间事件,以及所有已就绪的文件事件
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* Nothing to do? return ASAP */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            // 获取最近的时间事件
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                shortest = aeSearchNearestTimer(eventLoop);
            if (shortest) {
                // 如果时间事件存在的话
                // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
                long now_sec, now_ms;
    
                // 计算距今最近的时间事件还要多久才能达到
                // 并将该时间距保存在 tv 结构中
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
                tvp->tv_sec = shortest->when_sec - now_sec;
                if (shortest->when_ms < now_ms) {
                    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                    tvp->tv_sec --;
                } else {
                    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
                }
    
                // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
                if (tvp->tv_sec < 0) tvp->tv_sec = 0;
                if (tvp->tv_usec < 0) tvp->tv_usec = 0;
            } else {
                
                // 执行到这一步,说明没有时间事件
                // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
                if (flags & AE_DONT_WAIT) {
                    // 设置文件事件不阻塞
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    // 文件事件可以阻塞直到有事件到达为止
                    tvp = NULL; /* wait forever */
                }
            }
    
            // 处理文件事件,阻塞时间由 tvp 决定
            numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                // 从已就绪数组中获取事件
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
                // 读事件
                if (fe->mask & mask & AE_READABLE) {
                    // rfired 确保读/写事件只能执行其中一个
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                // 写事件
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
    
                processed++;
            }
        }
    
        /* Check time events */
        // 执行时间事件
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
    
        return processed; /* return the number of processed file/time events */
    }
    

    (1)首先获取下次执行时间文件的到期时间,根据这个时间传给Epoll函数,通知它,如果没事件准备好,堵塞多长时间。如果有事件就绪,立即返回
    (2)有事件就绪好,循环迭代这些事件,交给读或者写事件处理器处理。这里有个疑问?如果事件很多,那么下面的时间事件是不是无法按时执行了?
    (3)执行完就绪的事件后,如果时间事件就绪了,开始执行时间事件。

    疑问:1、上面提到的读和写处理器是什么?
    2、时间事件是干嘛用的?

    1、疑问1

    通过打断点,可以看到:在server.c#main()方法启动redis服务器时,先对redis服务器进行初始化

    void initServer() {
    .....
        /* Open the TCP listening socket for the user commands. */
        // 打开 TCP 监听端口,用于等待客户端的命令请求
        if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
            exit(1);
    
    
    
        /* Create the serverCron() time event, that's our main way to process
         * background operations. */
        // 为 serverCron() 创建时间事件
        if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            redisPanic("Can't create the serverCron time event.");
            exit(1);
        }
    
        // 为 TCP 连接关联连接应答(accept)处理器
        // 用于接受并应答客户端的 connect() 调用
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    redisPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
    
        // 为本地套接字关联应答处理器
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
    
        /* Open the AOF file if needed. */
        // 如果 AOF 持久化功能已经打开,那么打开或创建一个 AOF 文件
        if (server.aof_state == REDIS_AOF_ON) {
            server.aof_fd = open(server.aof_filename,
                                   O_WRONLY|O_APPEND|O_CREAT,0644);
            if (server.aof_fd == -1) {
                redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                    strerror(errno));
                exit(1);
            }
        }
    

    可以看到redis服务器在初始化时打开监听端口,等待客户端的命令请求。并且为TCP 连接关联连接应答(accept)处理器。这个处理器实现在networking.c#acceptTcpHandler()中

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[REDIS_IP_STR_LEN];
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
        REDIS_NOTUSED(privdata);
    
        while(max--) {
            // accept 客户端连接
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
           
            // 为客户端创建客户端状态(redisClient)
            acceptCommonHandler(cfd,0);
        }
    }
    
    static void acceptCommonHandler(int fd, int flags) {
    
        // 创建客户端
        redisClient *c;
        if ((c = createClient(fd)) == NULL) {
            redisLog(REDIS_WARNING,
                "Error registering fd event for the new client: %s (fd=%d)",
                strerror(errno),fd);
            close(fd); /* May be already closed, just ignore errors */
            return;
        }
        // 如果新添加的客户端令服务器的最大客户端数量达到了
        // 那么向新客户端写入错误信息,并关闭新客户端
        // 先创建客户端,再进行数量检查是为了方便地进行错误信息写入
        if (listLength(server.clients) > server.maxclients) {
            char *err = "-ERR max number of clients reached\r\n";
    
            /* That's a best effort error message, don't check write errors */
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            // 更新拒绝连接数
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    
        // 更新连接次数
        server.stat_numconnections++;
    
        // 设置 FLAG
        c->flags |= flags;
    }
    
    /*
     * 创建一个新客户端
     */
    redisClient *createClient(int fd) {
    
        // 分配空间
        redisClient *c = zmalloc(sizeof(redisClient));
    
        // 当 fd 不为 -1 时,创建带网络连接的客户端
        // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
        // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
        // 需要用到这种伪终端
        if (fd != -1) {
            // 非阻塞
            anetNonBlock(NULL,fd);
            // 禁用 Nagle 算法
            anetEnableTcpNoDelay(NULL,fd);
            // 设置 keep alive
            if (server.tcpkeepalive)
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
            // 绑定读事件到事件 loop (开始接收命令请求)
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }
    
        // 初始化各个属性
    
        // 返回客户端
        return c;
    }
    

    程序会将这个连接应答处理器和服务器监听套接字的AE_READABLE事件关联起来,当有客户端用sys/socket.h/connect函数 连接服务器监听套接字的时候,套接字就会产生AE_READABLE事件,被readQueryFromClient读处理器处理。

    写处理器也一样。
    当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的 AE_WRITABLE事件和命令回复处理器关联起来,当客户端准备好接收服务器传回的 命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器sendReplyToClient执行,并执行相应的套接字写入操作

    2、时间事件是什么

    时间事件一般分为定时事件和周期事件。定时事件意思是延迟多少秒再执行,周期事件是指每隔多少秒执行一次。比如java中的juc包就提供ScheduledExecutorService服务来实现这两种事件。redis目前版本貌视没有定时事件,源码中找不到,不知道是不是看漏了

    这个时间事件干嘛用的呢?
    根据源码的注释,可以看到最主要的几个工作是:
    1、更新服务器的各类统计信息,比如时间、内存占用、数据库占用情况等。
    2、 清理数据库中的过期键值对。 ·
    3、关闭和清理连接失效的客户端。 ·
    4、尝试进行AOF或RDB持久化操作。 ·
    5、如果服务器是主服务器,那么对从服务器进行定期同步。
    6、·如果处于集群模式,对集群进行定期同步和连接测试。

    具体实现在ac.c#serverCron函数中

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        int j;
        REDIS_NOTUSED(eventLoop);
        REDIS_NOTUSED(id);
        REDIS_NOTUSED(clientData);
    
        /* Software watchdog: deliver the SIGALRM that will reach the signal
         * handler if we don't return here fast enough. */
        if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    
        /* Update the time cache. */
        updateCachedTime();
    
        // 记录服务器执行命令的次数
        run_with_period(100) trackOperationsPerSecond();
    
        /* We have just REDIS_LRU_BITS bits per object for LRU information.
         * So we use an (eventually wrapping) LRU clock.
         *
         * Note that even if the counter wraps it's not a big problem,
         * everything will still work but some object will appear younger
         * to Redis. However for this to happen a given object should never be
         * touched for all the time needed to the counter to wrap, which is
         * not likely.
         *
         * 即使服务器的时间最终比 1.5 年长也无所谓,
         * 对象系统仍会正常运作,不过一些对象可能会比服务器本身的时钟更年轻。
         * 不过这要这个对象在 1.5 年内都没有被访问过,才会出现这种现象。
         *
         * Note that you can change the resolution altering the
         * REDIS_LRU_CLOCK_RESOLUTION define.
         *
         * LRU 时间的精度可以通过修改 REDIS_LRU_CLOCK_RESOLUTION 常量来改变。
         */
        server.lruclock = getLRUClock();
    
        /* Record the max memory used since the server was started. */
        // 记录服务器的内存峰值
        if (zmalloc_used_memory() > server.stat_peak_memory)
            server.stat_peak_memory = zmalloc_used_memory();
    
        /* Sample the RSS here since this is a relatively slow call. */
        server.resident_set_size = zmalloc_get_rss();
    
        /* We received a SIGTERM, shutting down here in a safe way, as it is
         * not ok doing so inside the signal handler. */
        // 服务器进程收到 SIGTERM 信号,关闭服务器
        if (server.shutdown_asap) {
    
            // 尝试关闭服务器
            if (prepareForShutdown(0) == REDIS_OK) exit(0);
    
            // 如果关闭失败,那么打印 LOG ,并移除关闭标识
            redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
            server.shutdown_asap = 0;
        }
    
        /* Show some info about non-empty databases */
        // 打印数据库的键值对信息
        run_with_period(5000) {
            for (j = 0; j < server.dbnum; j++) {
                long long size, used, vkeys;
    
                // 可用键值对的数量
                size = dictSlots(server.db[j].dict);
                // 已用键值对的数量
                used = dictSize(server.db[j].dict);
                // 带有过期时间的键值对数量
                vkeys = dictSize(server.db[j].expires);
    
                // 用 LOG 打印数量
                if (used || vkeys) {
                    redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                    /* dictPrintStats(server.dict); */
                }
            }
        }
    
        /* Show information about connected clients */
        // 如果服务器没有运行在 SENTINEL 模式下,那么打印客户端的连接信息
        if (!server.sentinel_mode) {
            run_with_period(5000) {
                redisLog(REDIS_VERBOSE,
                    "%lu clients connected (%lu slaves), %zu bytes in use",
                    listLength(server.clients)-listLength(server.slaves),
                    listLength(server.slaves),
                    zmalloc_used_memory());
            }
        }
    
        /* We need to do a few operations on clients asynchronously. */
        // 检查客户端,关闭超时客户端,并释放客户端多余的缓冲区
        clientsCron();
    
        /* Handle background operations on Redis databases. */
        // 对数据库执行各种操作
        databasesCron();
    
        /* Start a scheduled AOF rewrite if this was requested by the user while
         * a BGSAVE was in progress. */
        // 如果 BGSAVE 和 BGREWRITEAOF 都没有在执行
        // 并且有一个 BGREWRITEAOF 在等待,那么执行 BGREWRITEAOF
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
            server.aof_rewrite_scheduled)
        {
            rewriteAppendOnlyFileBackground();
        }
    
        /* Check if a background saving or AOF rewrite in progress terminated. */
        // 检查 BGSAVE 或者 BGREWRITEAOF 是否已经执行完毕
        if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
            int statloc;
            pid_t pid;
    
            // 接收子进程发来的信号,非阻塞
            if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
                int exitcode = WEXITSTATUS(statloc);
                int bysignal = 0;
                
                if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
    
                // BGSAVE 执行完毕
                if (pid == server.rdb_child_pid) {
                    backgroundSaveDoneHandler(exitcode,bysignal);
    
                // BGREWRITEAOF 执行完毕
                } else if (pid == server.aof_child_pid) {
                    backgroundRewriteDoneHandler(exitcode,bysignal);
    
                } else {
                    redisLog(REDIS_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
                updateDictResizePolicy();
            }
        } else {
    
            /* If there is not a background saving/rewrite in progress check if
             * we have to save/rewrite now */
            // 既然没有 BGSAVE 或者 BGREWRITEAOF 在执行,那么检查是否需要执行它们
    
            // 遍历所有保存条件,看是否需要执行 BGSAVE 命令
             for (j = 0; j < server.saveparamslen; j++) {
                struct saveparam *sp = server.saveparams+j;
    
                /* Save if we reached the given amount of changes,
                 * the given amount of seconds, and if the latest bgsave was
                 * successful or if, in case of an error, at least
                 * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
                // 检查是否有某个保存条件已经满足了
                if (server.dirty >= sp->changes &&
                    server.unixtime-server.lastsave > sp->seconds &&
                    (server.unixtime-server.lastbgsave_try >
                     REDIS_BGSAVE_RETRY_DELAY ||
                     server.lastbgsave_status == REDIS_OK))
                {
                    redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                        sp->changes, (int)sp->seconds);
                    // 执行 BGSAVE
                    rdbSaveBackground(server.rdb_filename);
                    break;
                }
             }
    
             /* Trigger an AOF rewrite if needed */
            // 出发 BGREWRITEAOF
             if (server.rdb_child_pid == -1 &&
                 server.aof_child_pid == -1 &&
                 server.aof_rewrite_perc &&
                 // AOF 文件的当前大小大于执行 BGREWRITEAOF 所需的最小大小
                 server.aof_current_size > server.aof_rewrite_min_size)
             {
                // 上一次完成 AOF 写入之后,AOF 文件的大小
                long long base = server.aof_rewrite_base_size ?
                                server.aof_rewrite_base_size : 1;
    
                // AOF 文件当前的体积相对于 base 的体积的百分比
                long long growth = (server.aof_current_size*100/base) - 100;
    
                // 如果增长体积的百分比超过了 growth ,那么执行 BGREWRITEAOF
                if (growth >= server.aof_rewrite_perc) {
                    redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                    // 执行 BGREWRITEAOF
                    rewriteAppendOnlyFileBackground();
                }
             }
        }
    
        // 根据 AOF 政策,
        // 考虑是否需要将 AOF 缓冲区中的内容写入到 AOF 文件中
        /* AOF postponed flush: Try at every cron cycle if the slow fsync
         * completed. */
        if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    
        /* AOF write errors: in this case we have a buffer to flush as well and
         * clear the AOF error in case of success to make the DB writable again,
         * however to try every second is enough in case of 'hz' is set to
         * an higher frequency. */
        run_with_period(1000) {
            if (server.aof_last_write_status == REDIS_ERR)
                flushAppendOnlyFile(0);
        }
    
        /* Close clients that need to be closed asynchronous */
        // 关闭那些需要异步关闭的客户端
        freeClientsInAsyncFreeQueue();
    
        /* Clear the paused clients flag if needed. */
        clientsArePaused(); /* Don't check return value, just use the side effect. */
    
        /* Replication cron function -- used to reconnect to master and
         * to detect transfer failures. */
        // 复制函数
        // 重连接主服务器、向主服务器发送 ACK 、判断数据发送失败情况、断开本服务器超时的从服务器,等等
        run_with_period(1000) replicationCron();
    
        /* Run the Redis Cluster cron. */
        // 如果服务器运行在集群模式下,那么执行集群操作
        run_with_period(100) {
            if (server.cluster_enabled) clusterCron();
        }
    
        /* Run the Sentinel timer if we are in sentinel mode. */
        // 如果服务器运行在 sentinel 模式下,那么执行 SENTINEL 的主函数
        run_with_period(100) {
            if (server.sentinel_mode) sentinelTimer();
        }
    
        /* Cleanup expired MIGRATE cached sockets. */
        // 集群。。。TODO
        run_with_period(1000) {
            migrateCloseTimedoutSockets();
        }
    
        // 增加 loop 计数器
        server.cronloops++;
    
        return 1000/server.hz;
    }
    

    总结

    1、redis的事件类型分为文件事件类型和时间事件类型。先执行文件事件再执行时间事件。如果时间事件执行时间过长,会导致文件事件来不及处理,所以redis会对时间事件的执行时间动态调整,下篇文章分析下如何实现
    2、文件事件类型的读和写处理器分别由readQueryFromClient和sendReplyToClient函数实现。由于redis的EPoll函数采用水平触发,当注册sendReplyToClient时,会导致没数据也被唤醒,所以当数据写完到客户端后,需要手动删除掉sendReplyToClient事件。
    3、AOF持久化,上面提到serverCron函数中会对AOF进行持久化。其实这里不是必须的,当配置server.aof_fsync == AOF_FSYNC_EVERYSEC时,才会执行,当配置server.aof_fsync == AOF_FSYNC_ALWAYS时,在事件循环之前执行。配置AOF_FSYNC_NO则交给操作系统控制。
    4、redis不是所有的任务都是单线程执行。一些耗时任务交给子线程执行,比如AOF文件刷新、AOF重写

    相关文章

      网友评论

        本文标题:读redis源码笔记1-线程模型

        本文链接:https://www.haomeiwen.com/subject/mmhvyhtx.html