美文网首页
Redis源码研究之命令处理与回复

Redis源码研究之命令处理与回复

作者: wenmingxing | 来源:发表于2018-04-25 20:27 被阅读34次

    本文主要从源码角度说明Redis为客户端提供服务(包括命令处理与回复)的过程。

    建议阅读:
    1、Redis 事件的理论说明见:wenmingxing Redis之事件
    2、阅读本文之前应该阅读:Redis源码研究之事件驱动

    I、上帝视角

    Redis在启动的时候会做一系列的初始化逻辑,如配置文件读取,网络通信模块初始化等,然后便开始进行事件循环,准备等待并处理请求。

    当客户端发起请求时,Redis进程会被唤醒(I/O多路复用函数的系统调用)。读取来自客户端的数据,解析命令,查找命令,执行命令,回复命令。

    首先来看main()函数:

    /* src/redis.c/main */
    int main(int argc, char **argv) {
       ......
       // 初始化服务器配置,主要是填充 redisServer 结构体中的各种参数
       initServerConfig();
       ......
       // 初始化服务器
       initServer();
       ......
       // 进入事件循环
       aeMain(server.el);
    }  
    

    II、initServer()

    1、initServerConfig()主要作用是填充struct redisServer结构体,Redis服务器的相关配置都在redisServer中。

    2、initServer()中完成对事件循环的初始化操作,并为监听做准备,并初始化数据库空间。

    /* src/redis.c/initServer */
    /*完成对事件循环的初始化工作,并为监听做准备 */
    void initServer() {
       // 创建事件循环结构体,函数aeCreateEventLoop在事件驱动一文中介绍过
       server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
    
       // 分配数据库空间
       server.db = zmalloc(sizeof(redisDb)*server.dbnum);
    
       /* Open the TCP listening socket for the user commands. */
       // listenToPort() 中有调用listen()
        if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
            exit(1);
    ......
    
    // 逐个初始化redis 数据库
    /* Create the Redis databases, and initialize other internal state. */
    for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多个数据库
        // 哈希表,用于存储键值对
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        // 哈希表,用于存储每个键的过期时间
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
      }
    
    ......
    // 创建接收TCP 或者UNIX 域套接字的事件处理
    // TCP
    /* Create an event handler for accepting new connections in TCP and Unix
    * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        // acceptTcpHandler() tcp 连接接受处理函数
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
       }
    
    ......
    
    }  
    

    III、aeMain()

    在完成initServerConfiginitServer的两步初始化之后,aeMain开始进入事件循环,等待请求的到来:

    /*事件处理器的主循环*/
    /* src/ae.c/aeMain */
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
        //如果有需要在时间处理前执行的函数,则运行它
        // 进入事件循环可能会进入睡眠状态。在睡眠之前,执行预设置
        // 的函数aeSetBeforeSleepProc()。
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
    
        // 开始处理事件,下面说明这个函数。AE_ALL_EVENTS 表示处理所有的事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
      }
    }  
    

    下面主要说明之前提到过的aeProcessEvents()函数:

    /*处理所有已到达的时间事件,以及所有已经就绪的文件事件*/
    /* src/ae.c */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        ......//获取最近的时间事件,并以此来初始化下面对文件事件的阻塞时间
        ......
    
        // 调用IO 多路复用函数阻塞监听,阻塞时间由tvp决定
        numevents = aeApiPoll(eventLoop, tvp);
    
        // 处理已经触发的事件
        for (j = 0; j < numevents; j++) {
            // 找到文件事件表中存储的数据,并完成参数的局部化
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
    
            /* note the fe->mask & mask & ... code: maybe an already processed
            * event removed an element that fired and we still didn't
            * processed, so we check if the event is still valid. */
            /*根据局部化的参数,进行判断*/
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);  //调用读事件函数
            }
    
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
        processed++;  //更新处理的事件个数,最后返回
      }
    }
        /*先处理文件事件再处理时间事件*/
        // 处理时间事件
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
        return processed; /* return the number of processed file/time events */
    }  
    

    IV、处理新的连接

    initServer()中,Redis注册了回调函数acceptTcpHandler(),当有新的连接到来时,这个函数会被回调,而aeProcessEvents中的rfileProc()实际上就是指向了acceptTcpHandler()用以处理连接:

    /* 创建一个TCP连接处理器 */
    /* src/networking.c/acceptTcpHandler */
    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd;
        char cip[REDIS_IP_STR_LEN];
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
        REDIS_NOTUSED(privdata);
    
        // anetTcpAccept接收客户端请求,封装的accept函数
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        // 出错
        if (cfd == AE_ERR) {
            redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
            return;
      }
        // 记录
        redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
        // 为客户端创建客户端状态redisClient,下面说明
        acceptCommonHandler(cfd,0);
    }
    

    anetTcpAccept()即为accept接收客户端请求,然后调用acceptCommonHandler()处理接收到的cfd,其中acceptCommonHandler最重要的调用就是createClient。Redis为每个客户端连接,都创建一个struct redisClient结构体:

    /* 创建一个新的客户端 */
    /* src/networking.c/createClient */
    redisClient *createClient(int fd) {
        //为结构体分配空间
        redisClient *c = zmalloc(sizeof(redisClient));
        /* passing -1 as fd it is possible to create a non connected client.
        * This is useful since all the Redis commands needs to be executed
        * in the context of a client. When commands are executed in other
        * contexts (for instance a Lua script) we need a non connected client.     */
        /*当fd为-1时,则证明是伪客户端,不需要socket;
         * 当fd不为-1时,需要创建带网络连接的客户端*/
        if (fd != -1) {
            anetNonBlock(NULL,fd);
            anetEnableTcpNoDelay(NULL,fd);
        //设置keep alive
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
    
        // 为接收到的套接字注册读事件
        // readQueryFromClient() 应该为读取客户端并查询缓冲区的内容
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
      }
      ......
      return c;
    }  
    

    V、处理请求

    readQueryFromClient()获取到客户端缓冲区的内容之后,会调用processInputBuffer()函数进行命令解析,然后会调用processCommand()函数处理命令:

    /* 负责执行读取到的命令 */
    /* src/redis.c/processCommand */
    int processCommand(redisClient *c) {
        ......
    
        // 查找命令,redisClient.cmd 在此时赋值
        /* Now lookup the command and check ASAP about trivial error conditions
        * such as wrong arity, bad command name and so forth. */
    
        //在命令表中查找命令
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    
        /*判断命令是否合法*/
        // 没有找到命令
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return REDIS_OK;
            // 参数个数不符合
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return REDIS_OK;
    }
    
    .....//一些判断,如集群,发布与订阅等
    
        // 加入命令队列的,除去EXEC,MULTI,WATCH等事务命令,if为事务操作,这里我们暂且不看,直接看else
        /* Exec the command */
        if (c->flags & REDIS_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
        // 事务命令入队
        queueMultiCommand(c);
        addReply(c,shared.queued);
        // 真正执行命令。
        // 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
        } else {    //非事务操作
            call(c,REDIS_CALL_FULL);    //执行命令
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
        }
    return REDIS_OK;
    }  
    

    processCommand函数除了检查之外,核心调用为call()函数,其对应了Redis的所有命令。

    set请求为例,会调用setCommand()函数:

    /* 执行set命令的调用*/
    /* src/t_string.c/setCommand */
    void setCommand(redisClient *c) {
        ......//一些判断
    
        //真正的命令执行
        setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
    }
    
    /* src/t_string.c/setGenericCommand */
    void setGenericCommand(redisClient *c, int flags, robj *key,
            robj *val, robj *expire, int unit, robj *ok_reply,
            robj *abort_reply) {
        ......
    
        //将键值关联到数据库
        setKey(c->db,key,val);
        ......
    
        //回复结果
        /* src/networking.c/addReply */
        addReply(c, ok_reply ? ok_reply : shared.ok);
    }
    
    //将键值关联到数据库
    /* src/db.c/setKey */
    void setKey(redisDb *db, robj *key, robj *val) {
        if (lookupKeyWrite(db,key) == NULL) {
            dbAdd(db,key,val);
        } else {
        dbOverwrite(db,key,val);
      }
      ......
    }  
    

    在完成一系列检查与转化之后,调用setGenericCommand,最后调用addReply()函数,为客户端连接的socket注册可写事件,将返回信息添加到回复缓冲区中,回传给客户端。

    【参考】
    [1] 《Redis设计与实现》
    [2] 《Redis源码日志》

    相关文章

      网友评论

          本文标题:Redis源码研究之命令处理与回复

          本文链接:https://www.haomeiwen.com/subject/tlrylftx.html