美文网首页
Redis 源码简洁剖析 16 - 客户端

Redis 源码简洁剖析 16 - 客户端

作者: 被称为L的男人 | 来源:发表于2022-02-22 08:27 被阅读0次

    整体概述

    Redis 一个服务器可以和多个客户端建立网络连接,每个客户端都可以向服务器发送命令请求,服务器接收客户端的命令,处理后将结果返回给客户端。

    Redis 的文件事件处理器使用 I/O 多路复用,Redis 使用单线程单进程处理命令请求,与多个客户端进行网络通信。

    image

    每个连接了 Redis 服务器的客户端,服务器都建立了一个 redisClient 结构的客户端状态,保存了客户端当前的状态信息,以及执行相关功能时用到的数据结构。

    Redis 服务器状态结构的 clients 属性是一个链表,保存了所有与服务器连接的客户端状态。

    struct redisServer {
        ……
        // 保存了所有客户端状态的链表
        list *clients;
        ……
    };
    
    image

    客户端属性

    先贴一下 client 完整的数据结构:

    typedef struct client {
        uint64_t id;            /* Client incremental unique ID. */
        connection *conn;
        int resp;               /* RESP protocol version. Can be 2 or 3. */
        redisDb *db;            /* Pointer to currently SELECTed DB. */
        robj *name;             /* As set by CLIENT SETNAME. */
        sds querybuf;           /* Buffer we use to accumulate client queries. */
        size_t qb_pos;          /* The position we have read in querybuf. */
        sds pending_querybuf;   /* If this client is flagged as master, this buffer
                                   represents the yet not applied portion of the
                                   replication stream that we are receiving from
                                   the master. */
        size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
        int argc;               /* Num of arguments of current command. */
        robj **argv;            /* Arguments of current command. */
        int original_argc;      /* Num of arguments of original command if arguments were rewritten. */
        robj **original_argv;   /* Arguments of original command if arguments were rewritten. */
        size_t argv_len_sum;    /* Sum of lengths of objects in argv list. */
        struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
        user *user;             /* User associated with this connection. If the
                                   user is set to NULL the connection can do
                                   anything (admin). */
        int reqtype;            /* Request protocol type: PROTO_REQ_* */
        int multibulklen;       /* Number of multi bulk arguments left to read. */
        long bulklen;           /* Length of bulk argument in multi bulk request. */
        list *reply;            /* List of reply objects to send to the client. */
        unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
        size_t sentlen;         /* Amount of bytes already sent in the current
                                   buffer or object being sent. */
        time_t ctime;           /* Client creation time. */
        long duration;          /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
        time_t lastinteraction; /* Time of the last interaction, used for timeout */
        time_t obuf_soft_limit_reached_time;
        uint64_t flags;         /* Client flags: CLIENT_* macros. */
        int authenticated;      /* Needed when the default user requires auth. */
        int replstate;          /* Replication state if this is a slave. */
        int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
        int repldbfd;           /* Replication DB file descriptor. */
        off_t repldboff;        /* Replication DB file offset. */
        off_t repldbsize;       /* Replication DB file size. */
        sds replpreamble;       /* Replication DB preamble. */
        long long read_reploff; /* Read replication offset if this is a master. */
        long long reploff;      /* Applied replication offset if this is a master. */
        long long repl_ack_off; /* Replication ack offset, if this is a slave. */
        long long repl_ack_time;/* Replication ack time, if this is a slave. */
        long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica  */
        long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                           copying this slave output buffer
                                           should use. */
        char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
        int slave_listening_port; /* As configured with: REPLCONF listening-port */
        char *slave_addr;       /* Optionally given by REPLCONF ip-address */
        int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
        multiState mstate;      /* MULTI/EXEC state */
        int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
        blockingState bpop;     /* blocking state */
        long long woff;         /* Last write global replication offset. */
        list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
        dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
        list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
        sds peerid;             /* Cached peer ID. */
        sds sockname;           /* Cached connection target address. */
        listNode *client_list_node; /* list node in client list */
        listNode *paused_list_node; /* list node within the pause list */
        RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
                                                   * when the authenticated user
                                                   * changes. */
        void *auth_callback_privdata; /* Private data that is passed when the auth
                                       * changed callback is executed. Opaque for
                                       * Redis Core. */
        void *auth_module;      /* The module that owns the callback, which is used
                                 * to disconnect the client if the module is
                                 * unloaded for cleanup. Opaque for Redis Core.*/
    
        /* If this client is in tracking mode and this field is non zero,
         * invalidation messages for keys fetched by this client will be send to
         * the specified client ID. */
        uint64_t client_tracking_redirection;
        rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
                                          subscribed to in BCAST mode, in the
                                          context of client side caching. */
        /* In clientsCronTrackClientsMemUsage() we track the memory usage of
         * each client and add it to the sum of all the clients of a given type,
         * however we need to remember what was the old contribution of each
         * client, and in which categoty the client was, in order to remove it
         * before adding it the new value. */
        uint64_t client_cron_last_memory_usage;
        int      client_cron_last_memory_type;
        /* Response buffer */
        int bufpos;
        char buf[PROTO_REPLY_CHUNK_BYTES];
    } client;
    

    套接字描述符

    typedef struct client {
        ……
        // 记录客户端正在使用的套接字描述符
        int fd;
        ……
    }
    
    image

    标志

    客户端的标志属性 flags 记录了客户端的角色(role),以及客户端目前所处的状态:

    typedef struct redisClient {
        // ...
        int flags;
        // ...
    } redisClient;
    

    具体值可参考:《Redis 设计与实现-客户端属性》,flag 例子:

    # 客户端是一个主服务器
    REDIS_MASTER
    
    # 客户端正在被列表命令阻塞
    REDIS_BLOCKED
    
    # 客户端正在执行事务,但事务的安全性已被破坏
    REDIS_MULTI | REDIS_DIRTY_CAS
    
    # 客户端是一个从服务器,并且版本低于 Redis 2.8
    REDIS_SLAVE | REDIS_PRE_PSYNC
    
    # 这是专门用于执行 Lua 脚本包含的 Redis 命令的伪客户端
    # 它强制服务器将当前执行的命令写入 AOF 文件,并复制给从服务器
    REDIS_LUA_CLIENT | REDIS_FORCE_AOF | REDIS_FORCE_REPL
    

    输入缓冲区

    客户端状态的输入缓冲区用于保存客户端发送的命令请求

    typedef struct redisClient {
        // ...
        sds querybuf;
        // ...
    } redisClient;
    

    如果客户端向服务器发送了以下命令请求:

    SET key value
    

    客户端状态的 querybuf 属性将是一个包含以下内容的 SDS 值:

    *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
    

    展示了这个 SDS 值以及 querybuf 属性的样子:

    image

    命名及命令参数

    在服务器将客户端发送的命令请求保存到客户端状态的 querybuf 后,服务器会分析该命令,将得到的命令参数、命令参数的个数分别保存到客户端状态的 argv 属性和 argc 属性中:

    typedef struct redisClient {
        // ...
        robj **argv;
        int argc;
        // ...
    } redisClient;
    
    image

    命令的实现函数

    当服务器从协议内容中分析并得出 argv 属性和 argc 属性的值之后, 服务器将根据项 argv[0] 的值,在命令表中查找命令所对应的命令实现函数。

    当程序在命令表中成功找到 argv[0] 所对应的 redisCommand 结构时, 它会将客户端状态的 cmd 指针指向这个结构:

    typedef struct redisClient {
        // ...
        struct redisCommand *cmd;
        // ...
    } redisClient;
    
    struct redisCommand {
        char *name;
        redisCommandProc *proc;
        int arity;
        char *sflags;   /* Flags as string representation, one char per flag. */
        uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
        /* Use a function to determine keys arguments in a command line.
         * Used for Redis Cluster redirect. */
        redisGetKeysProc *getkeys_proc;
        /* What keys should be loaded in background when calling this command? */
        int firstkey; /* The first argument that's a key (0 = no keys) */
        int lastkey;  /* The last argument that's a key */
        int keystep;  /* The step between first and last key */
        long long microseconds, calls, rejected_calls, failed_calls;
        int id;     /* Command ID. This is a progressive ID starting from 0 that
                       is assigned at runtime, and is used in order to check
                       ACLs. A connection is able to execute a given command if
                       the user associated to the connection has this command
                       bit set in the bitmap of allowed commands. */
    };
    

    每个命令所对应的处理函数在是下面的 table:

    struct redisCommand redisCommandTable[] = {
        {"module",moduleCommand,-2,
         "admin no-script",
         0,NULL,0,0,0,0,0,0},
    
        {"get",getCommand,2,
         "read-only fast @string",
         0,NULL,1,1,1,0,0,0},
    
        {"getex",getexCommand,-2,
         "write fast @string",
         0,NULL,1,1,1,0,0,0},
    
         ……
    }
    

    输出缓冲区

    保存执行命令所得的命令回复

    image

    客户端的固定大小缓冲区bufbufpos 两个属性组成:

    typedef struct redisClient {
        // ...
        char buf[REDIS_REPLY_CHUNK_BYTES];
        // 记录了 buf 数组目前已使用的字节数量
        int bufpos;
        // ...
    } redisClient;
    

    可变大小缓冲区reply 链表和一个或多个字符串对象组成:

    typedef struct redisClient {
        // ...
    
        list *reply;
        // ...
    } redisClient;
    

    通过使用链表来连接多个字符串对象, 服务器可以为客户端保存一个非常长的命令回复, 而不必受到固定大小缓冲区 16 KB 大小的限制。展示了一个包含三个字符串对象的 reply 链表。

    image

    客户端的创建与关闭

    image

    创建普通客户端

    使用 connect 函数连接到服务器,服务器调用连接事件处理器,为客户端创建对应的客户端状态,并将其添加到服务器状态结构 clients 链表的末尾。

    image

    关闭普通客户端

    关闭普通客户端的原因:

    • 客户端进程退出或被杀死,客户端与服务端的网络连接被关闭
    • 客户端向服务端发送了不符合协议格式的命令请求
    • 客户端成为了 CLIENT KILL 命令的目标
    • 客户端的空转时间超过 timeout 配置选项的值
    • 客户端发送的命令请求大小,超过了深入缓冲区的限制大小(默认为 1GB)
    • 服务端返回给客户端的数据超过了输出缓冲区的限制大小

    参考链接

    Redis 源码简洁剖析系列

    最简洁的 Redis 源码剖析系列文章

    Java 编程思想-最全思维导图-GitHub 下载链接,需要的小伙伴可以自取~

    原创不易,希望大家转载时请先联系我,并标注原文链接。

    相关文章

      网友评论

          本文标题:Redis 源码简洁剖析 16 - 客户端

          本文链接:https://www.haomeiwen.com/subject/banwlrtx.html