客户端
Redis服务器状态结构的clients属性是一个链表,这个链表保存了所有与服务器连接的客户端状态结构:
/* 因为 I/O 复用的缘故,需要为每个客户端维持一个状态。多个客户端状态被服务器用链表连接起来。 */
typedef struct redisClient {
int fd; // 套接字描述符
redisDb *db; // 当前正在使用的数据库
int dictid; // 当前正在使用的数据库的 id (号码)
robj *name; // 客户端的名字
sds querybuf; // 查询缓冲区
size_t querybuf_peak; // 查询缓冲区长度峰值
int argc; // 参数数量
robj **argv; // 参数对象数组
struct redisCommand *cmd, *lastcmd; // 记录被客户端执行的命令
int reqtype;// 请求的类型:内联命令还是多条命令
int multibulklen; // 剩余未读取的命令内容数量
long bulklen; // 命令内容的长度
list *reply; // 回复链表
unsigned long reply_bytes; // 回复链表中对象的总大小
int sentlen; // 已发送字节,处理 short write 用
time_t ctime; // 创建客户端的时间
time_t lastinteraction; // 客户端最后一次和服务器互动的时间
time_t obuf_soft_limit_reached_time; // 客户端的输出缓冲区超过软性限制的时间
int flags; // 客户端状态标志
int authenticated; // 当 server.requirepass 不为 NULL 时代表认证的状态:0 代表未认证, 1 代表已认证
int replstate; // 复制状态
int repldbfd; // 用于保存主服务器传来的 RDB 文件的文件描述符
off_t repldboff; // 读取主服务器传来的 RDB 文件的偏移量
off_t repldbsize; // 主服务器传来的 RDB 文件的大小
sds replpreamble; /* replication DB preamble. */
long long reploff; // 主服务器的复制偏移量
long long repl_ack_off; // 从服务器最后一次发送 REPLCONF ACK 时的偏移量
long long repl_ack_time; // 从服务器最后一次发送 REPLCONF ACK 的时间
char replrunid[REDIS_RUN_ID_SIZE+1]; // 主服务器的 master run ID,保存在客户端,用于执行部分重同步
int slave_listening_port; // 从服务器的监听端口号
multiState mstate; // 事务状态
int btype; // 阻塞类型
blockingState bpop; // 阻塞状态
long long woff; // 最后被写入的全局复制偏移量
list *watched_keys; // 被监视的键
dict *pubsub_channels; // 这个字典记录了客户端所有订阅的频道,键为频道名字,值为 NULL,也即是,一个频道的集合
list *pubsub_patterns; // 链表,包含多个 pubsubPattern 结构,记录了所有订阅频道的客户端的信息,新 pubsubPattern 结构总是被添加到表尾
sds peerid; /* Cached peer ID. */
/* Response buffer */
int bufpos; // 回复偏移量
char buf[REDIS_REPLY_CHUNK_BYTES]; // 回复缓冲区
} redisClient;
- fd:套接字描述符
- -1:伪客户端,命令请求来源于AOF文件或者Lua脚本,而不是网络
- 大于-1的整数:普通客户端
- name:客户端名称,默认情况下没有名字,可用setname命令设置
- flags:标志客户端角色以及当前状态
- querybuf:输入缓冲区,其大小根据输入内容动态伸缩,但最大不超过1GB,否则服务器将关闭这个客户端
- argc、argv:命令参数个数以及命令参数(数组,元素为字符串对象),argv[0]是要执行的命令
- cmd:有一个命令表,该表是一个字典,键是一个SDS结构,保存了命令的名字,值是对应的redisCommand结构,保存了命令的实现函数、命令的标志、参数个数、执行次数和消耗时长等信息。根据argv[0],可以从该命令表中找到对应的redisCommand结构,将cmd指向该结构。
- 输出缓冲区
- buf[REDIS_REPLY_CHUNK_BYTES]:固定大小(16KB)的缓冲区,用于存储长度比较小的回复
-
reply:可变大小的缓冲区,是一个列表和一或多个字符串对象组成:
image
- authenticated:身份验证,0表示未通过验证,1表示已通过验证
- ctime、lastinteraction、obuf_soft_limit_reached_time:时间属性,分别表示客户端创建时间、最后一次跟服务器互动时间以及客户端空转时间。
服务器
Redis服务器负责与多个客户端建立网络连接,处理客户端发送的命令请求,在数据库中保存客户端执行命令所产生的数据,并通过资源管理来维持服务器自身的运转。
struct redisServer {
/* General */
char *configfile; // 配置文件的绝对路径
int hz; // serverCron() 每秒调用的次数
redisDb *db; // 数据库
dict *commands; // 命令表(受到 rename 配置选项的作用)
dict *orig_commands; // 命令表(无 rename 配置选项的作用)
aeEventLoop *el;// 事件状态
unsigned lruclock:REDIS_LRU_BITS; // 最近一次使用时钟
int shutdown_asap; // 关闭服务器的标识
int activerehashing; // 在执行 serverCron() 时进行渐进式 rehash
char *requirepass; // 是否设置了密码
char *pidfile; // PID 文件
int arch_bits; // 架构类型
int cronloops; // serverCron() 函数的运行次数计数器
char runid[REDIS_RUN_ID_SIZE+1]; // 本服务器的 RUN ID
int sentinel_mode; // 服务器是否运行在 SENTINEL 模式
/* Networking */
int port; // TCP 监听端口
int tcp_backlog; /* TCP listen() backlog */
char *bindaddr[REDIS_BINDADDR_MAX]; // 地址
int bindaddr_count; // 地址数量
char *unixsocket; // UNIX 套接字
mode_t unixsocketperm; /* UNIX socket permission */
int ipfd[REDIS_BINDADDR_MAX]; // 描述符
int ipfd_count; // 描述符数量
int sofd; // UNIX 套接字文件描述符
int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */
int cfd_count; /* Used slots in cfd[] */
list *clients; // 一个链表,保存了所有客户端状态结构
list *clients_to_close; // 链表,保存了所有待关闭的客户端
list *slaves, *monitors; // 链表,保存了所有从服务器,以及所有监视器
redisClient *current_client; // 服务器的当前客户端,仅用于崩溃报告
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; // 网络错误
dict *migrate_cached_sockets; // MIGRATE 缓存
/* RDB / AOF loading information */
int loading; // 这个值为真时,表示服务器正在进行载入
off_t loading_total_bytes; // 正在载入的数据的大小
off_t loading_loaded_bytes; // 已载入数据的大小
time_t loading_start_time; // 开始进行载入的时间
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand; // 常用命令的快捷连接
/* Fields used only for stats */
time_t stat_starttime; // 服务器启动时间
long long stat_numcommands; // 已处理命令的数量
long long stat_numconnections; // 服务器接到的连接请求数量
long long stat_expiredkeys; // 已过期的键数量
long long stat_evictedkeys; // 因为回收内存而被释放的过期键的数量
long long stat_keyspace_hits; // 成功查找键的次数
long long stat_keyspace_misses; // 查找键失败的次数
size_t stat_peak_memory; // 已使用内存峰值
long long stat_fork_time; // 最后一次执行 fork() 时消耗的时间
long long stat_rejected_conn; // 服务器因为客户端数量过多而拒绝客户端连接的次数
long long stat_sync_full; // 执行 full sync 的次数
long long stat_sync_partial_ok; // PSYNC 成功执行的次数
long long stat_sync_partial_err; // PSYNC 执行失败的次数
/* slowlog */
list *slowlog; // 保存了所有慢查询日志的链表
long long slowlog_entry_id; // 下一条慢查询日志的 ID
long long slowlog_log_slower_than; // 服务器配置 slowlog-log-slower-than 选项的值
unsigned long slowlog_max_len; // 服务器配置 slowlog-max-len 选项的值
size_t resident_set_size; /* RSS sampled in serverCron(). */
/* The following two are used to track instantaneous "load" in terms
* of operations per second. */
long long ops_sec_last_sample_time; // 最后一次进行抽样的时间
long long ops_sec_last_sample_ops; // 最后一次抽样时,服务器已执行命令的数量
long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES]; // 抽样结果
int ops_sec_idx; // 数组索引,用于保存抽样结果,并在需要时回绕到 0
/* Configuration */
int verbosity; // 日志可见性
int maxidletime; // 客户端最大空转时间
int tcpkeepalive; // 是否开启 SO_KEEPALIVE 选项
int active_expire_enabled; /* Can be disabled for testing purposes. */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int daemonize; /* True if running as a daemon */
clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
// 客户端输出缓冲区大小限制
// 数组的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 个
// 每个代表一类客户端:普通、从服务器、pubsub,诸如此类
/* AOF persistence */
int aof_state; // AOF 状态(开启/关闭/可写)
int aof_fsync; // 所使用的 fsync 策略(每个写入/每秒/从不)
char *aof_filename; /* Name of the AOF file */
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
off_t aof_rewrite_base_size; // 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小
off_t aof_current_size; // AOF 文件的当前字节大小
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; // 负责进行 AOF 重写的子进程 ID
list *aof_rewrite_buf_blocks; // AOF 重写缓存链表,链接着多个缓存块
sds aof_buf; // AOF 缓冲区
int aof_fd; // AOF 文件的描述符
int aof_selected_db; // AOF 的当前目标数据库
time_t aof_flush_postponed_start; // 推迟 write 操作的时间
time_t aof_last_fsync; // 最后一直执行 fsync 的时间
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start; // AOF 重写的开始时间
int aof_lastbgrewrite_status; // 最后一次执行 BGREWRITEAOF 的结果
unsigned long aof_delayed_fsync; // 记录 AOF 的 write 操作被推迟了多少次
int aof_rewrite_incremental_fsync; // 指示是否需要每写入一定量的数据,就主动执行一次 fsync()
int aof_last_write_status; /* REDIS_OK or REDIS_ERR */
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
/* RDB persistence */
long long dirty; // 自从上次 SAVE 执行以来,数据库被修改的次数
long long dirty_before_bgsave; // BGSAVE 执行前的数据库被修改次数
pid_t rdb_child_pid; // 负责执行 BGSAVE 的子进程的 ID,没在执行 BGSAVE 时,设为 -1
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
time_t lastsave; // 最后一次完成 SAVE 的时间
time_t lastbgsave_try; // 最后一次尝试执行 BGSAVE 的时间
time_t rdb_save_time_last; // 最近一次 BGSAVE 执行耗费的时间
time_t rdb_save_time_start; // 数据库最近一次开始执行 BGSAVE 的时间
int lastbgsave_status; // 最后一次执行 SAVE 的状态
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Replication (master) */
int slaveseldb; /* Last SELECTed DB in replication output */
long long master_repl_offset; // 全局复制偏移量(一个累计值)
int repl_ping_slave_period; // 主服务器发送 PING 的频率
char *repl_backlog; // backlog 本身
long long repl_backlog_size; // backlog 的长度
long long repl_backlog_histlen; // backlog 中数据的长度
long long repl_backlog_idx; // backlog 的当前索引
long long repl_backlog_off; // backlog 中可以被还原的第一个字节的偏移量
time_t repl_backlog_time_limit; // backlog 的过期时间
time_t repl_no_slaves_since; // 距离上一次有从服务器的时间,是否开启最小数量从服务器写入功能
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; // 定义最小数量从服务器的最大延迟值
int repl_good_slaves_count; // 延迟良好的从服务器的数量
/* Replication (slave) */
char *masterauth; // 主服务器的验证密码
char *masterhost; // 主服务器的地址
int masterport; // 主服务器的端口
int repl_timeout; // 超时时间
redisClient *master; // 主服务器所对应的客户端
redisClient *cached_master; // 被缓存的主服务器,PSYNC 时使用
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; // 复制的状态(服务器是从服务器时使用)
off_t repl_transfer_size; // RDB 文件的大小*/
off_t repl_transfer_read; // 已读 RDB 文件内容的字节数
off_t repl_transfer_last_fsync_off; // 最近一次执行 fsync 时的偏移量,用于 sync_file_range 函数
int repl_transfer_s; // 主服务器的套接字
int repl_transfer_fd; // 保存 RDB 文件的临时文件的描述符
char *repl_transfer_tmpfile; // 保存 RDB 文件的临时文件名字
time_t repl_transfer_lastio; // 最近一次读入 RDB 内容的时间
int repl_serve_stale_data; /* Serve stale data when link is down? */
int repl_slave_ro; // 是否只读从服务器?
time_t repl_down_since; // 连接断开的时长
int repl_disable_tcp_nodelay; // 是否要在 SYNC 之后关闭 NODELAY ?
int slave_priority; // 从服务器优先级
char repl_master_runid[REDIS_RUN_ID_SIZE+1]; // 本服务器(从服务器)当前主服务器的 RUN ID
long long repl_master_initial_offset; // 初始化偏移量
/* Replication script cache. */
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
list *repl_scriptcache_fifo; // FIFO 队列
int repl_scriptcache_size; // 缓存的大小
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
/* Limits */
int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Pricision of random sampling */
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
int sort_store;
/* Zip structure config, see redis.conf for more information */
size_t hash_max_ziplist_entries;
size_t hash_max_ziplist_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
size_t set_max_intset_entries;
size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value;
size_t hll_sparse_max_bytes;
time_t unixtime; /* Unix time sampled every cron cycle. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// 字典,键为频道,值为链表
// 链表中保存了所有订阅某个频道的客户端
// 新客户端总是被添加到链表的表尾
list *pubsub_patterns; // 这个链表记录了客户端订阅的所有模式的名字
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
mstime_t cluster_node_timeout; /* Cluster node timeout. */
char *cluster_configfile; /* Cluster auto-generated config file name. */
struct clusterState *cluster; /* State of the cluster */
int cluster_migration_barrier; /* Cluster replicas migration barrier. */
/* Scripting */
lua_State *lua; // Lua 环境
redisClient *lua_client; // 复制执行 Lua 脚本中的 Redis 命令的伪客户端
redisClient *lua_caller; // 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL
dict *lua_scripts; // 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和
mstime_t lua_time_limit; // Lua 脚本的执行时限
mstime_t lua_time_start; // 脚本开始执行的时间
int lua_write_dirty; // 脚本是否执行过写命令
int lua_random_dirty; // 脚本是否执行过带有随机性质的命令
int lua_timedout; // 脚本是否超时
int lua_kill; // 是否要杀死脚本
/* Assert & bug reporting */
char *assert_failed;
char *assert_file;
int assert_line;
int bug_report_start; /* True if bug report header was already logged. */
int watchdog_period; /* Software watchdog period in ms. 0 = off */
};
命令执行过程:
- 客户端向服务器发送命令请求;
- 服务器接收并处理客户端发来的命令请求,在数据库中进行设置操作,并产生命令回复;
- 服务器将命令回复发送给客户端;
- 客户端接收服务器返回的命令回复,并将这个回复打印给用户。
1、发送命令请求
客户端将命令转换成协议格式,并通过连接到服务器的套接字发送给服务器。
2、读取命令请求
服务器将命令请求读到输入缓冲区中,并对缓冲区中的请求进行分析,提取命令请求中的参数、参数个数,保存到argv和argc中。
接着调用命令执行器:
(1)查找命令实现
根据客户端状态的argv[0]参数(命令名),在命令表中查询所指定的命令,并将命令保存到客户端状态结构的cmd属性中。
命令表示一个字典,键为命令名字,值是一个redisCommand结构:
image
sflags是标志符,功能如下:
image
(2)执行预备操作
在执行真正的命令之前,还需要做一些预备操作,确保命令可以正确的运行:
- 检査客户端状态的cmd指针是否指向NULL,如果是的话,渾么说明用户输人的 命令名字找不到相应的命令实现,服务器不再执行后续步骤,并向客户端返回一个 错误。
- 根 据 客 户 端cmd厲性指向的redisCommand结构的arity属性,检査命令请求 所给定的参数个数是否正确,当参数个数不正确时,不再执行后续步骤,直接向客户端返回一个错误。比如说,redisCommand结构的arity厲性的值为-3,那么用户输入的命令参数个数必须大于等于3个才行。
- 检査客户端是否已经通过了身份验证,未通过身份验证的客户端只能执行AUTH命令,如果未通过身份验证的客户靖试图执行除AUTH命令之外的其他命令,那么服务器将向客户端返回一个错误。
- 如果服务器打开了 maxmemory功能,那么在执行命令之前,先检査服务器的内存占用情况,并在有需要时进行内存回收,从而使得接下来的命令可以顺利执行。如果内存回收失败,那么不再执行后续步驟,向客户端返回一个错误。
- 如果服务器上一次执行BGSAVE命令时出错,并且嚴务器打开了 stop-writes- on-bgsave-error功能,而且服务器即将要执行的命令是一个写命令,那么服务器将拒绝执行这个命令,并向客户端返回一个错误。
- 如果客户端当前正在用SUBSCRIBE命令订阅频道 ,或者正在用PSUBSCRIBE命令订阅模式,那么服务器只会执行客户端发来的SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE和PUNSUBSCRIBE四个命令,其他命令都会被脤务器拒绝。
- 如果服务器正在进行数据载人,那么客户端发送的命令必须带有1标识(比如INFO、SHUTDOWN、PUBLISH等等)才会被服务器执行,其他命令都会被服务器拒绝。
- 如果服务器因为执行Lua脚本而超时并进入阻塞状态,那么服务器只会执行客户端发来的SHUTDOWN nosave命令和SCRIPT KILL命令 ,其他命令都会被服务器绝 。
- 如果客户端正在执行事务,那么服务器只会执行客户端发来的EXEC、DISCARD、MULTI、WATCH四个命令,其他命令都会被放进亊务队列中。
- 如果服务器打开了监视器功能,渾么服务器会将要执行的命令和参数等信息发送给监视器。当完成了以上预备操作之后,服务器就可以开始真正执行命令了。
(3)调用命令实现函数
cmd属性指向的结构中包含了命令实现函数:
image
调用该命令实现函数,并产生相应的命令回复,保存在输出缓冲区中,再通过与客户端相连的套接字返回给客户端。
(4)执行后续工作
- 如果服务器开启了慢査询日志功能,那么慢査询日志模块会检査是否需要为刚刚执 行完的命令请求添加一条新的慢査询日志。
- 根据刚刚执行命令所耗费的时长,更新被执行命令的redisCommand结构的 milliseconds 厲性,并将命令的redisCommand 结构的calls 计数器的值增一。
- 如 果 服 务 器 开 启 了 AOF 持久化功能,那么AOF 持久化模块会将刚刚执行的命令请 求写人到AOF 缓冲区里面。
- 如果有其他从服务器正在复制当前这个服务器,那么服务器会将刚刚执行的命令传 播给所有从服务器。
网友评论