美文网首页程序员程序园
memcached源码分析-指令解析模块

memcached源码分析-指令解析模块

作者: saltcc | 来源:发表于2019-04-29 23:39 被阅读2次

    1.前言

    前面一章节我们介绍了libevent网络事件模块,当连接有数据请求过来的时候,就会触发work线程的读写事件回调函数:event_handler,这个方法调用drive_machine函数,根据状态解析事件。我们都知道Memcached是一个高性能的开源分布式内存对象缓存系统,但memcached的服务器客户端通信并不使用复杂的XML等格式,而使用简单的基于文本行的协议,我们使用简单文本指令就可以实现对memcached的操作。
    接下来主要选取set指令和get指令进行分析,从而了解memcached是如何解析客户的指令请求的。

    指令简要说明:

    set指令语法:
    add key flags exptime bytes [noreply]
    value

    参数说明:

    • key:键值 key-value 结构中的 key,用于查找缓存值。
    • flags:可以包括键值对的整型参数,客户机使用它存储关于键值对的额外信息 。
    • exptime:在缓存中保存键值对的时间长度(以秒为单位,0 表示永远)
    • bytes:在缓存中存储的字节数
    • noreply(可选): 该参数告知服务器不需要返回数据
    • value:存储的值(始终位于第二行)(可直接理解为key-value结构中的value

    get指令语法:
    get key

    参数说明如下:
    key:键值 key-value 结构中的 key,用于查找缓存值。

    2.指令流程图

    当客户端连接发送指令给memcached,就会触发work线程的读写事件回调函数:event_handler,这个方法调用drive_machine状态机函数,下面就是setget指令的状态机执行流程图

    set指令.jpg
    get指令.jpg

    3.源码分析

    针对上述流程图,做进一步的源码分析
    在分析源码之前,这里主要介绍一下相关模块的重要数据结构

    struct conn {
        int    sfd;
    #ifdef TLS
        SSL    *ssl;
        char   *ssl_wbuf;
        bool ssl_enabled;
    #endif
        sasl_conn_t *sasl_conn;
        bool sasl_started;
        bool authenticated;
        enum conn_states  state;
        enum bin_substates substate;
        rel_time_t last_cmd_time;
        struct event event;
        short  ev_flags;
        short  which;   /** which events were just triggered */
    
        //rbuf 用于存储读取命令的内存
        char   *rbuf;   /** buffer to read commands into */
        //如果我们已经解析了部分数据,rcurr游标执行已经解析的位置
        char   *rcurr;  /** but if we parsed some already, this is where we stopped */
        //为rbuf分配空间大小
        int    rsize;   /** total allocated size of rbuf */
        //未解析的数据字节数 rbytes = rszie - (rcur - rbuf)
        int    rbytes;  /** how much data, starting from rcur, do we have unparsed */
    
        char   *wbuf;
        char   *wcurr;
        int    wsize;
        int    wbytes;
        /** which state to go into after finishing current write */
        enum conn_states  write_and_go;
        void   *write_and_free; /** free this memory after finishing writing */
    
        //指向item结构,在set命令中ritem指向了item结构中内存保存关键字key之后的位置,客户端读取
        //value值之后,将value值保存于ritem所指向的内存。
        //例如客户端第一步执行set testkey 0 0 5,服务端接收到该指令,然后申请item结构保存指令内容,
        //ritem则执向item结构中保存testkey之后的那段内存空间,rlbytes为5,表示要读取5个字节的value值,
        //第二步客户端输入value值之后,服务端触发事件,服务端根据rlbytes大小从客户端连接中读取相应大小的数据
        //保存于ritem执向的内存空间
        char   *ritem;  /** when we read in an item's value, it goes here */
        //需要读取内容的大小
        int    rlbytes;
    
        /* data for the nread state */
    
        /**
         * item is used to hold an item structure created after reading the command
         * line of set/add/replace commands, but before we finished reading the actual
         * data. The data is read into ITEM_data(item) to avoid extra copying.
         */
    
        //为set命令申请的相关内存结构
        void   *item;     /* for commands set/add/replace  */
    
        /* data for the swallow state */
        int    sbytes;    /* how many bytes to swallow */
    
        /* data for the mwrite state */
        struct iovec *iov;
        int    iovsize;   /* number of elements allocated in iov[] */
        int    iovused;   /* number of elements used in iov[] */
    
        struct msghdr *msglist;
        int    msgsize;   /* number of elements allocated in msglist[] */
        int    msgused;   /* number of elements used in msglist[] */
        int    msgcurr;   /* element in msglist[] being transmitted now */
        int    msgbytes;  /* number of bytes in current msg */
    
        item   **ilist;   /* list of items to write out */
        int    isize;
        item   **icurr;
        int    ileft;
    
        char   **suffixlist;
        int    suffixsize;
        char   **suffixcurr;
        int    suffixleft;
    #ifdef EXTSTORE
        int io_wrapleft;
        unsigned int recache_counter;
        io_wrap *io_wraplist; /* linked list of io_wraps */
        bool io_queued; /* FIXME: debugging flag */
    #endif
        enum protocol protocol;   /* which protocol this connection speaks */
        enum network_transport transport; /* what transport is used by this connection */
    
        /* data for UDP clients */
        int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
        struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
        socklen_t request_addr_size;
        unsigned char *hdrbuf; /* udp packet headers */
        int    hdrsize;   /* number of headers' worth of space is allocated */
    
        bool   noreply;   /* True if the reply should not be sent. */
        /* current stats command */
        struct {
            char *buffer;
            size_t size;
            size_t offset;
        } stats;
    
        /* Binary protocol stuff */
        /* This is where the binary header goes */
        protocol_binary_request_header binary_header;
        uint64_t cas; /* the cas to return */
        short cmd; /* current command being processed */
        int opaque;
        int keylen;
        conn   *next;     /* Used for generating a list of conn structures */
        LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
        ssize_t (*read)(conn  *c, void *buf, size_t count);
        ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
        ssize_t (*write)(conn *c, void *buf, size_t count);
    };
    

    drive_machine函数就是整个事件执行流程的核心函数,内部while循环切换不同的状态,完成不同状态下的业务逻辑处理

    static void drive_machine(conn *c) {
        //...
        while (!stop) {
            switch(c->state) {
                //...
                case conn_waiting:
                    //...
                    //设置为可读状态,等待可读事件的触发
                    conn_set_state(c, conn_read);
                    stop = true;
                    break;
                case conn_read:
                    //...
                    //从conn->sfd中读取指令数据
                    //例如:set testkey 0 0 5 \r\n
                    res = try_read_network(c);
                    switch (res) {
                        //....
                        case READ_DATA_RECEIVED:
                            conn_set_state(c, conn_parse_cmd);
                            break;
                        //...
                    }
                    break;
                case conn_parse_cmd:
                    //命令的解析
                    if (try_read_command(c) == 0) {
                        /* wee need more data! */
                        //数据不完整,需要等待完整读取数据指令
                        conn_set_state(c, conn_waiting);
                    }
                    break;
                case conn_new_cmd:
                    --nreqs;
                    if (nreqs >= 0) {
                        //设置事件的状态
                        reset_cmd_handler(c);
                    }else{
                        //....
                    }
                    break;
                case conn_nread:
                    if (c->rlbytes == 0) {
                        //数据读取完毕
                        complete_nread(c);
                        break;
                    }
                    //...
                    if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
                        //...
                        /* now try reading from the socket */
                        //从socket读取数据内容保存到c->ritem指向的内存空间
                        //例如:set testkey 0 0 5,指令部分try_read_network已经读取
                        //这里read就是获取key-val对应的value内容(该value内容被指定为5个字节)
                        res = c->read(c, c->ritem, c->rlbytes);
                        //...
                    }
                    break;
                case conn_write:
                    //...
                    /* fall through... */
                    //注意这里,conn_write状态执行完毕,没有break,直接执行conn_mwrite状态
                case conn_mwrite:
                    //...
                    //给客户端写数据,例如客户端的get指令获取存储的value
                    switch (transmit(c)) {
                        case TRANSMIT_COMPLETE:
                            if (c->state == conn_mwrite) {
                                conn_release_items(c);
                                /* XXX:  I don't know why this wasn't the general case */
                                if(c->protocol == binary_prot) {
                                    conn_set_state(c, c->write_and_go);
                                } else {
                                    //设置连接状态
                                    //conn_new_cmd状态中如果rbuf中还有数据未进行处理,那么继续处理
                                    //如果所有数据都处理完毕,那么会转为conn_waiting状态,等待事件触发
                                    conn_set_state(c, conn_new_cmd);
                                }
                            }
                            //...
                            break;
                        //...
                    }
                    break;
            }
        }
    }
    

    conn_read状态下调用try_read_network函数进行连接数据的读取

    /*
     * read from network as much as we can, handle buffer overflow and connection
     * close.
     * before reading, move the remaining incomplete fragment of a command
     * (if any) to the beginning of the buffer.
     *
     * To protect us from someone flooding a connection with bogus data causing
     * the connection to eat up all available memory, break out and start looking
     * at the data I've got after a number of reallocs...
     *
     * @return enum try_read_result
     */
    //读取客户端传递过来的命令数据
    static enum try_read_result try_read_network(conn *c) {
        //rbuf 用于存储读取命令的内存
        //rcur 如果我们已经解析了部分数据,rcurr游标执行已经解析的位置
        //rsize 为rbuf分配空间大小
        //rbytes 未解析的数据字节数 rbytes = rszie - (rcur - rbuf)
    
        //在读取命令数据之前,首先判断c->rcurr != c->rbuf
        if (c->rcurr != c->rbuf) {
            if (c->rbytes != 0)
                //如果发现之前还有部分命令未解析完全,那么将未解析的数据拷贝到存储命令空间rbuf的首位置
                memmove(c->rbuf, c->rcurr, c->rbytes);
            //将rcurr移动到rbuf的位置,也就是存储命令的首位置
            c->rcurr = c->rbuf;
        }
        //尽可能多的尝试读取命令数据
        while (1) {
            //如果读取的字节数大于等于rbuf的内存空间,则重新分配内存,memcached做了次数限制
            if (c->rbytes >= c->rsize) {
                if (num_allocs == 4) {
                    return gotdata;
                }
                ++num_allocs;
                char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
                //...
                c->rcurr = c->rbuf = new_rbuf;
                c->rsize *= 2;
            }
            //rbuf的剩余空间大小
            int avail = c->rsize - c->rbytes;
            //尽可能的读取avail字节长度内容
            res = c->read(c, c->rbuf + c->rbytes, avail);
    
            if (res > 0) {
                //...
                c->rbytes += res;
                //如果实际读取的字节数和我们尝试读取的字节数相等,
                //那么极有可能还有数据可读, continue继续尝试读取socket数据
                if (res == avail) {
                    continue;
                }else{
                    break;
                }
            }
            //...
        }
        return gotdata;
    }
    

    conn_parse_cmd状态下调用try_read_command函数进入命令解析的入口

    /*
     * if we have a complete line in the buffer, process it.
     */
     //处理指令数据
    static int try_read_command(conn *c) {
        //...
        if (c->protocol == binary_prot) {
            //...
        }else{
            char *el, *cont;
            //没有可解析数据
            if (c->rbytes == 0)
                return 0;
            //查找指令数据中的\n
            //例如:set testkey 0 0 5 \r\n
            //get testkey \r\n等等
            el = memchr(c->rcurr, '\n', c->rbytes);
    
            if (!el) {
                //...
            }
            cont = el + 1;
            if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
                el--;
            }
            //将\r替换为'\0',此时c->rcurr内容为set testkey 0 0 5 \0\n
            *el = '\0';
    
            //...
            //解析命令
            process_command(c, c->rcurr);
            //剩余未解析命令数据
            c->rbytes -= (cont - c->rcurr);
            c->rcurr = cont;
        }
    
        return 1;
    }
    
    //命令行数据分解
    static void process_command(conn *c, char *command) {
        token_t tokens[MAX_TOKENS];
        size_t ntokens;
        int comm;
        //...
        //指令分解保存到tokens数组
        ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    
        if (ntokens >= 3 &&
            ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
            (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
            //get命令处理
            process_get_command(c, tokens, ntokens, false, false);
        }else if ((ntokens == 6 || ntokens == 7) &&
            ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
            (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
            (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
            (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
            (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
            //set指令的解析
            process_update_command(c, tokens, ntokens, comm, false);
        }//...
    
        return;
    }
    
    //指令分解
    //例如:set testkey 0 0 5分解为
    //tokens[0].value = 'set' ; tokens[0].length = 3
    //tokens[1].value = 'testkey' ; tokens[1].length = 7
    //tokens[2].value = '0' ; tokens[2].length = 1
    //tokens[3].value = '0' ; tokens[3].length = 1
    //tokens[4].value = '5' ; tokens[4].length = 1
    //tokens[5].value = '\0' ; tokens[5].length = 1
    static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
        char *s, *e;
        size_t ntokens = 0;
        size_t len = strlen(command);
        unsigned int i = 0;
    
        assert(command != NULL && tokens != NULL && max_tokens > 1);
    
        s = e = command;
        for (i = 0; i < len; i++) {
            if (*e == ' ') {
                if (s != e) {
                    tokens[ntokens].value = s;
                    tokens[ntokens].length = e - s;
                    ntokens++;
                    *e = '\0';
                    if (ntokens == max_tokens - 1) {
                        e++;
                        s = e; /* so we don't add an extra token */
                        break;
                    }
                }
                s = e + 1;
            }
            e++;
        }
    
        if (s != e) {
            tokens[ntokens].value = s;
            tokens[ntokens].length = e - s;
            ntokens++;
        }
    
        /*
         * If we scanned the whole string, the terminal value pointer is null,
         * otherwise it is the first unprocessed character.
         */
        tokens[ntokens].value =  *e == '\0' ? NULL : e;
        tokens[ntokens].length = 0;
        ntokens++;
    
        return ntokens;
    }
    

    get指令处理函数

    static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, 
                                            bool return_cas, bool should_touch) {
        //...
        //key nkey
        token_t *key_token = &tokens[KEY_TOKEN];
    
        //...
        //主要根据key值取内存中查询对应value值
        //然后格式化回复内容
    
        //...
        if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
            || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
            if (fail_length) {
                out_string(c, "CLIENT_ERROR bad command line format");
            } else {
                out_of_memory(c, "SERVER_ERROR out of memory writing get response");
            }
            conn_release_items(c);
        }else {
            //设置连接状态
            conn_set_state(c, conn_mwrite);
            c->msgcurr = 0;
        }
    }
    

    set指令处理函数

    static void process_update_command(conn *c, token_t *tokens, const size_t ntokens,
                                            int comm, bool handle_cas){
        //...
        //key
        key = tokens[KEY_TOKEN].value;
        nkey = tokens[KEY_TOKEN].length;
    
        //将拆解的命令转换为整形
        if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
               && safe_strtol(tokens[3].value, &exptime_int)
               && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
            out_string(c, "CLIENT_ERROR bad command line format");
            return;
        }
        //...
        //为操作指令分配相关空间资源,我会在后面的学习中进行具体分析
        //这里涉及slab相关知识,暂时不做分析
        it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
    
        //...
        //ITEM_data 用于计算命令key所在内存的偏移位置
        //c->ritem指向命令key值之后的内存,在conn_nread中会将value值读到c->ritem指向的内存位置
        //当成功读取value时,it中保存内容key value
        c->ritem = ITEM_data(it);
        //需要读取的value值长度
        c->rlbytes = it->nbytes;
        //指令类型
        c->cmd = comm;
        conn_set_state(c, conn_nread);
    }
    

    transmit将结果发送客户端

    static enum transmit_result transmit(conn *c) {
        //...
        if (c->msgcurr < c->msgused) {
            //...
            //将查询的结果返回请求的客户端
            res = c->sendmsg(c, m, 0);
            //...
        }
    }
    

    相关文章

      网友评论

        本文标题:memcached源码分析-指令解析模块

        本文链接:https://www.haomeiwen.com/subject/cqcpnqtx.html