美文网首页redis
redis 客户端管理

redis 客户端管理

作者: 晴天哥_王志 | 来源:发表于2018-06-11 19:06 被阅读32次

    写在前面

     这一章节涉及的内容比较简单,主要是为了讲清楚redis server端如何维持和client相关的连接的,基本从以下几个方面展开描述:

    • redis server端采用何种数据结构维护连接信息
    • redis server端创建和client相关连接的过程
    • redis client相关的数据结构定义

    redis server核心数据结构

     redis server作为redis最核心的数据结构,基本上redis所有相关的信息都会在这里有体现,这里列出几大块。

    • 服务本身运行信息
    • 网络监听连接等
    • AOF/RDB持久化信息
    • redis慢查询日志
    • 主从同步信息
    • 集群cluster信息

     redis客户端管理相关的数据结构主要有

    • list *clients; // 一个链表,保存了所有客户端状态结构
    • list *clients_to_close; // 链表,保存了所有待关闭的客户端
    • list *slaves, *monitors; // 链表,保存了所有从服务器,以及所有监视器

    redis server端使用列表来管理客户端发起的连接,以list保存所有客户端发起的连接,以及待关闭的客户端连接。

    struct redisServer {
    
        // serverCron() 每秒调用的次数
        int hz;                     /* serverCron() calls frequency in hertz */
    
        // 数据库
        redisDb *db;
    
        // 事件状态
        aeEventLoop *el;
    
        /* redis 网络相关的数据结构 */
    
        // TCP 监听端口
        int port;                   /* TCP listening port */
    
        int tcp_backlog;            /* TCP listen() backlog */
    
        // 地址
        char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
        // 地址数量
        int bindaddr_count;         /* Number of addresses in server.bindaddr[] */
    
        // 一个链表,保存了所有客户端状态结构
        list *clients;              /* List of active clients */
        // 链表,保存了所有待关闭的客户端
        list *clients_to_close;     /* Clients to close asynchronously */
    
        // 链表,保存了所有从服务器,以及所有监视器
        list *slaves, *monitors;    /* List of slaves and MONITORs */
    
        /* RDB / AOF 载入信息相关数据结构 */
    
        // 这个值为真时,表示服务器正在进行载入
        int loading;                /* We are loading data from disk if true */
    
        // 正在载入的数据的大小
        off_t loading_total_bytes;
    
        // 已载入数据的大小
        off_t loading_loaded_bytes;
    
        // 开始进行载入的时间
        time_t loading_start_time;
        off_t loading_process_events_interval_bytes;
    
        /* Fast pointers to often looked up command */
        // 常用命令的快捷连接
        struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
                            *rpopCommand;
    
    
        /* server端相关的统计信息  */
    
        // 服务器启动时间
        time_t stat_starttime;          /* Server start time */
    
        // 已处理命令的数量
        long long stat_numcommands;     /* Number of processed commands */
    
        // 服务器接到的连接请求数量
        long long stat_numconnections;  /* Number of connections received */
    
        // 已过期的键数量
        long long stat_expiredkeys;     /* Number of expired keys */
    
    
        /*  慢查询日志 */
    
        // 保存了所有慢查询日志的链表
        list *slowlog;                  /* SLOWLOG list of commands */
    
        // 下一条慢查询日志的 ID
        long long slowlog_entry_id;     /* SLOWLOG current entry ID */
    
        // 服务器配置 slowlog-log-slower-than 选项的值
        long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
    
        // 服务器配置 slowlog-max-len 选项的值
        unsigned long slowlog_max_len;    
        size_t resident_set_size;      
      
        // 最后一次进行抽样的时间
        long long ops_sec_last_sample_time;
        // 最后一次抽样时,服务器已执行命令的数量
        long long ops_sec_last_sample_ops; 
        // 抽样结果
        long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
        // 数组索引,用于保存抽样结果,并在需要时回绕到 0
        int ops_sec_idx;
    
        /* AOF 持久化信息*/
    
        // AOF 状态(开启/关闭/可写)
        int aof_state;                  /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
    
        // 所使用的 fsync 策略(每个写入/每秒/从不)
        int aof_fsync;                  /* Kind of fsync() policy */
        char *aof_filename;             /* Name of the AOF file */
        int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */
        int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */
        off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */
    
        // 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小
        off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */
    
        // AOF 文件的当前字节大小
        off_t aof_current_size;         /* AOF current size. */
        int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
    
        
        /* RDB 持久化信息 */
    
        // 自从上次 SAVE 执行以来,数据库被修改的次数
        long long dirty;                /* Changes to DB from the last save */
    
        // BGSAVE 执行前的数据库被修改次数
        long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */
    
        // 负责执行 BGSAVE 的子进程的 ID
        // 没在执行 BGSAVE 时,设为 -1
        pid_t rdb_child_pid;            /* PID of RDB saving child */
        struct saveparam *saveparams;   /* Save points array for RDB */
        int saveparamslen;              /* Number of saving points */
        char *rdb_filename;             /* Name of RDB file */
        int rdb_compression;            /* Use compression in RDB? */
        int rdb_checksum;               /* Use RDB checksum? */
    
    
        /* 日志相关信息 */
        char *logfile;                  /* Path of log file */
        int syslog_enabled;             /* Is syslog enabled? */
        char *syslog_ident;             /* Syslog ident */
        int syslog_facility;            /* Syslog facility */
    
    
        /* 主从复制 master的配置信息 */
        int slaveseldb;                 /* Last SELECTed DB in replication output */
        // 全局复制偏移量(一个累计值)
        long long master_repl_offset;   /* Global replication offset */
        // 主服务器发送 PING 的频率
        int repl_ping_slave_period;     /* Master pings the slave every N seconds */
    
    
        /* 主从赋值 slave的配置信息 */
        // 主服务器的验证密码
        char *masterauth;               /* AUTH with this password with master */
        // 主服务器的地址
        char *masterhost;               /* Hostname of master */
        // 主服务器的端口
        int masterport;                 /* Port of master */
        // 超时时间
        int repl_timeout;               /* Timeout after N seconds of master idle */
       
    
        /* 发布订阅信息 */
        // 字典,键为频道,值为链表
        // 链表中保存了所有订阅某个频道的客户端
        // 新客户端总是被添加到链表的表尾
        dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    
        // 这个链表记录了客户端订阅的所有模式的名字
        list *pubsub_patterns;  /* A list of pubsub_patterns */
    
        int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
                                       xor of REDIS_NOTIFY... flags. */
    
    
        /* 集群信息 */
    
        int cluster_enabled;      /* Is cluster enabled? */
        mstime_t cluster_node_timeout; /* Cluster node timeout. */
        char *cluster_configfile; /* Cluster auto-generated config file name. */
        struct clusterState *cluster;  /* State of the cluster */
    
        int cluster_migration_barrier; /* Cluster replicas migration barrier. */
        
      /* 脚本信息 */
    
        // Lua 环境
        lua_State *lua; /* The Lua interpreter. We use just one for all clients */
        
        // 复制执行 Lua 脚本中的 Redis 命令的伪客户端
        redisClient *lua_client;   /* The "fake client" to query Redis from Lua */
    
        // 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL
        redisClient *lua_caller;   /* The client running EVAL right now, or NULL */
    };
    

    redis 客户端创建过程

     当客户端第一次向server发起连接的时候,也就是在监听socket的accept的处理函数当中acceptCommonHandler的内部通过createClient创建对应client连接的socket。

    /*
     * TCP 连接 accept 处理器
     */
    #define MAX_ACCEPTS_PER_CALL 1000
    static void acceptCommonHandler(int fd, int flags) {
    
        // 创建客户端
        redisClient *c;
        if ((c = createClient(fd)) == NULL) {
            redisLog(REDIS_WARNING,
                "Error registering fd event for the new client: %s (fd=%d)",
                strerror(errno),fd);
            close(fd); /* May be already closed, just ignore errors */
            return;
        }
    
        // 如果新添加的客户端令服务器的最大客户端数量达到了
        // 那么向新客户端写入错误信息,并关闭新客户端
        // 先创建客户端,再进行数量检查是为了方便地进行错误信息写入
        if (listLength(server.clients) > server.maxclients) {
            char *err = "-ERR max number of clients reached\r\n";
    
            /* That's a best effort error message, don't check write errors */
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            // 更新拒绝连接数
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    
        // 更新连接次数
        server.stat_numconnections++;
    
        // 设置 FLAG
        c->flags |= flags;
    }
    



     createClient内部进行按照以下步骤创建了client的socket并添加list列表当中。

    • 分配redisClient的内存大小
    • 根据fd执行aeCreateFileEvent将socket添加到对应的eventloop当中实现事件监听
    • 初始化一堆client相关的属性
    • 执行listAddNodeTail方法将client添加到客户端相关的列表当中
    /*
     * 创建一个新客户端
     */
    redisClient *createClient(int fd) {
    
        // 分配空间
        redisClient *c = zmalloc(sizeof(redisClient));
    
        // 当 fd 不为 -1 时,创建带网络连接的客户端
        // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
        // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
        // 需要用到这种伪终端
        if (fd != -1) {
            // 非阻塞
            anetNonBlock(NULL,fd);
            // 禁用 Nagle 算法
            anetEnableTcpNoDelay(NULL,fd);
            // 设置 keep alive
            if (server.tcpkeepalive)
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
            // 绑定读事件到事件 loop (开始接收命令请求)
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
                close(fd);
                zfree(c);
                return NULL;
            }
        }
    
        // 初始化各个属性
    
        // 默认数据库
        selectDb(c,0);
        // 套接字
        c->fd = fd;
        // 名字
        c->name = NULL;
        // 回复缓冲区的偏移量
        c->bufpos = 0;
        // 查询缓冲区
        c->querybuf = sdsempty();
        // 查询缓冲区峰值
        c->querybuf_peak = 0;
        // 命令请求的类型
        c->reqtype = 0;
        // 命令参数数量
        c->argc = 0;
        // 命令参数
        c->argv = NULL;
        // 当前执行的命令和最近一次执行的命令
        c->cmd = c->lastcmd = NULL;
        // 查询缓冲区中未读入的命令内容数量
        c->multibulklen = 0;
        // 读入的参数的长度
        c->bulklen = -1;
        // 已发送字节数
        c->sentlen = 0;
        // 状态 FLAG
        c->flags = 0;
        // 创建时间和最后一次互动时间
        c->ctime = c->lastinteraction = server.unixtime;
        // 认证状态
        c->authenticated = 0;
        // 复制状态
        c->replstate = REDIS_REPL_NONE;
        // 复制偏移量
        c->reploff = 0;
        // 通过 ACK 命令接收到的偏移量
        c->repl_ack_off = 0;
        // 通过 AKC 命令接收到偏移量的时间
        c->repl_ack_time = 0;
        // 客户端为从服务器时使用,记录了从服务器所使用的端口号
        c->slave_listening_port = 0;
        // 回复链表
        c->reply = listCreate();
        // 回复链表的字节量
        c->reply_bytes = 0;
        // 回复缓冲区大小达到软限制的时间
        c->obuf_soft_limit_reached_time = 0;
        // 回复链表的释放和复制函数
        listSetFreeMethod(c->reply,decrRefCountVoid);
        listSetDupMethod(c->reply,dupClientReplyValue);
        // 阻塞类型
        c->btype = REDIS_BLOCKED_NONE;
        // 阻塞超时
        c->bpop.timeout = 0;
        // 造成客户端阻塞的列表键
        c->bpop.keys = dictCreate(&setDictType,NULL);
        // 在解除阻塞时将元素推入到 target 指定的键中
        // BRPOPLPUSH 命令时使用
        c->bpop.target = NULL;
        c->bpop.numreplicas = 0;
        c->bpop.reploffset = 0;
        c->woff = 0;
        // 进行事务时监视的键
        c->watched_keys = listCreate();
        // 订阅的频道和模式
        c->pubsub_channels = dictCreate(&setDictType,NULL);
        c->pubsub_patterns = listCreate();
        c->peerid = NULL;
        listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
        listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
        // 如果不是伪客户端,那么添加到服务器的客户端链表中
        if (fd != -1) listAddNodeTail(server.clients,c);
        // 初始化客户端的事务状态
        initClientMultiState(c);
    
        // 返回客户端
        return c;
    }
    

     redisClient对应的数据结构定义,很多变量我们看着都很熟悉。

    • 跟请求相关的变量包括:(redisDb *db)、(int argc)、( robj **argv)
    • 跟订阅相关的变量包括:(dict *pubsub_channels)、(list *pubsub_patterns)。
    /* 
     *
     * 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。
     *
     * 多个客户端状态被服务器用链表连接起来。
     */
    typedef struct redisClient {
    
        // 套接字描述符
        int fd;
    
        // 当前正在使用的数据库
        redisDb *db;
    
        // 当前正在使用的数据库的 id (号码)
        int dictid;
    
        // 客户端的名字
        robj *name;             /* As set by CLIENT SETNAME */
    
        // 查询缓冲区
        sds querybuf;
    
        // 查询缓冲区长度峰值
        size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */
    
        // 参数数量
        int argc;
    
        // 参数对象数组
        robj **argv;
    
        // 记录被客户端执行的命令
        struct redisCommand *cmd, *lastcmd;
    
        // 请求的类型:内联命令还是多条命令
        int reqtype;
    
        // 剩余未读取的命令内容数量
        int multibulklen;       /* number of multi bulk arguments left to read */
    
        // 命令内容的长度
        long bulklen;           /* length of bulk argument in multi bulk request */
    
        // 回复链表
        list *reply;
    
        // 回复链表中对象的总大小
        unsigned long reply_bytes; /* Tot bytes of objects in reply list */
    
        // 已发送字节,处理 short write 用
        int sentlen;            /* Amount of bytes already sent in the current
                                   buffer or object being sent. */
    
        // 创建客户端的时间
        time_t ctime;           /* Client creation time */
    
        // 客户端最后一次和服务器互动的时间
        time_t lastinteraction; /* time of the last interaction, used for timeout */
    
        // 客户端的输出缓冲区超过软性限制的时间
        time_t obuf_soft_limit_reached_time;
    
        // 客户端状态标志
        int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
    
        // 当 server.requirepass 不为 NULL 时
        // 代表认证的状态
        // 0 代表未认证, 1 代表已认证
        int authenticated;      /* when requirepass is non-NULL */
    
        // 复制状态
        int replstate;          /* replication state if this is a slave */
        // 用于保存主服务器传来的 RDB 文件的文件描述符
        int repldbfd;           /* replication DB file descriptor */
    
        // 读取主服务器传来的 RDB 文件的偏移量
        off_t repldboff;        /* replication DB file offset */
        // 主服务器传来的 RDB 文件的大小
        off_t repldbsize;       /* replication DB file size */
        
        sds replpreamble;       /* replication DB preamble. */
    
        // 主服务器的复制偏移量
        long long reploff;      /* replication offset if this is our master */
        // 从服务器最后一次发送 REPLCONF ACK 时的偏移量
        long long repl_ack_off; /* replication ack offset, if this is a slave */
        // 从服务器最后一次发送 REPLCONF ACK 的时间
        long long repl_ack_time;/* replication ack time, if this is a slave */
        // 主服务器的 master run ID
        // 保存在客户端,用于执行部分重同步
        char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
        // 从服务器的监听端口号
        int slave_listening_port; /* As configured with: SLAVECONF listening-port */
    
        // 事务状态
        multiState mstate;      /* MULTI/EXEC state */
    
        // 阻塞类型
        int btype;              /* Type of blocking op if REDIS_BLOCKED. */
        // 阻塞状态
        blockingState bpop;     /* blocking state */
    
        // 最后被写入的全局复制偏移量
        long long woff;         /* Last write global replication offset. */
    
        // 被监视的键
        list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    
        // 这个字典记录了客户端所有订阅的频道
        // 键为频道名字,值为 NULL
        // 也即是,一个频道的集合
        dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    
        // 链表,包含多个 pubsubPattern 结构
        // 记录了所有订阅频道的客户端的信息
        // 新 pubsubPattern 结构总是被添加到表尾
        list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
        sds peerid;             /* Cached peer ID. */
    
        /* Response buffer */
        // 回复偏移量
        int bufpos;
        // 回复缓冲区
        char buf[REDIS_REPLY_CHUNK_BYTES];
    
    } 
    

    相关文章

      网友评论

        本文标题:redis 客户端管理

        本文链接:https://www.haomeiwen.com/subject/gywosftx.html