单机Redis INCR命令完成执行流程
![](https://img.haomeiwen.com/i1036501/31230362d5930d92.jpg)
- 客户端发起一个TCP连接到Redis的6379端口,Redis中的AE事件驱动框架{todo:link}在某一次事件循环中捕获到来自ipfd数组中6379端口的文件描述符的读事件到达,回调注册到该文件事件上的事件处理器acceptTcpHandler()函数,该函数首先调用anetTcpAccept()函数调用操作系统的accept()函数接受连接,并返回该连接的文件描述符[cfd]和客户端IP[cip]。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) { // 每个周期最多接受1000个连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0,cip);
}
}
- 接着调用acceptCommonHandler(cfd,0,cip)创建客户端createClient(),在createClient()中初始化client结构体并将向AE事件驱动框架中注册([cfd,AE_READABLE]->readQueryFromClient){todo:protocol}事件。
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
/* 创建redisClient数据结构 */
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd);
return;
}
/* 如果maxclients指令被设置,并且客户端数量超过了限制。
* 注意我们在检查这个条件之前创建客户端,是因为在createClient中socket会被设置为非阻塞模式,
* 我们可以直接使用内核IO发送回错误信息 */
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
/* 如果服务运行在保护模式并且没设置密码且没绑定地址,我们将不会接受从非loop网卡来的连接。 */
if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
char *err =
"-DENIED Redis is running in protected mode because protected "
"mode is enabled, no bind address was specified, no "
"authentication password is requested to clients. In this mode "
"connections are only accepted from the loopback interface. "
"If you want to connect from external computers to Redis you "
"may adopt one of the following solutions: "
"1) Just disable protected mode sending the command "
"'CONFIG SET protected-mode no' from the loopback interface "
"by connecting to Redis from the same host the server is "
"running, however MAKE SURE Redis is not publicly accessible "
"from internet if you do so. Use CONFIG REWRITE to make this "
"change permanent. "
"2) Alternatively you can just disable the protected mode by "
"editing the Redis configuration file, and setting the protected "
"mode option to 'no', and then restarting the server. "
"3) If you started the server manually just for testing, restart "
"it with the '--protected-mode no' option. "
"4) Setup a bind address or an authentication password. "
"NOTE: You only need to do one of the above things in order for "
"the server to start accepting connections from the outside.\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
}
server.stat_numconnections++;
c->flags |= flags;
}
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
/* 如果fd=-1表示是一个非正常连接上来的redisClient。
* redis所有的命令都需要在一个client上下中执行,当命令需要执行在一个非正常连接的上下文中时,
* 我们会使用fd=-1来创建redisClient,比如lua脚本。
* */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注册AE_READABLE事件回调为readQueryFromClient函数
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
c->fd = fd;
c->name = NULL;
c->bufpos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->cmd = c->lastcmd = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->slave_ip[0] = '\0';
c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (fd != -1) linkClient(c);
initClientMultiState(c);
return c;
}
/* 这个函数将会把client增加到全局连接list,unlinkClient相反 */
void linkClient(client *c) {
listAddNodeTail(server.clients,c);
/* 注意,我们将会保存该节点,所以我们在unlinkClient时不需要线性扫描 */
c->client_list_node = listLast(server.clients);
uint64_t id = htonu64(c->id);
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}
-
当客户端连接上服务器后,发送incr redis 12345{todo:protocol->Inline或MultiBulk}命令到服务器,如果使用MultiBulk协议,那么客户端将会发送:
Inline: INCR redis 12345 MultiBulk: *3\r\n$4\r\nINCR\r\n$5\r\nredis\r\n$5\r\n12345\r\n
-
Redis中的AE事件驱动框架在某一次事件循环中捕获到来自[cfd,AE_READABLE]事件到达,调用该事件的事件处理器函数readQueryFromClient(),readQueryFromClient()函数不断读取TCP缓冲区中的数据,写入到redisClient结构体的query_buf[]中.
/* 从TCP缓冲区中读取数据,写入到client->querybuf中,并进行处理 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN;
/* 我们尽可能的申请一块大内存=16KB,这样我们可以在读取过程中不必扩容或者拷贝 */
/* 如果请求是MultiBulk模式,并且存在未读取的bulk,且当前正在读取的bulk有待读取的剩余字节,
* 并且待读取的字节数大于32KB,我们将会扩容当前使得可以容纳当前bulk */
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 扩大sds的内部缓冲区使得可以多容纳readlen字节数据
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) { // 读取出错
if (errno == EAGAIN) { // TCP缓冲区暂时无数据,返回下次继续读取
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c); // 真的读取出错,关闭客户端
return;
}
} else if (nread == 0) { // 客户端关闭
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) { // 如果客户端为Master
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) { // 如果查询缓冲区太长,则释放客户端连接
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
/* Time to process the buffer. If the client is a master we need to
* compute the difference between the applied offset before and after
* processing the buffer, to understand how much of the replication stream
* was actually applied to the master state: this quantity, and its
* corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */
/* 处理输入缓冲区中的数据,并且进行复制操作。 */
processInputBufferAndReplicate(c);
}
- 然后readQueryFromClient()函数会调用processInputBufferAndReplicate(),处理缓冲区的数据processInputBuffer()函数。Redis支持两种命令协议,Inline格式协议的解析将会由processInlineBuffer()函数执行,Resp(即MultiBulk)协议的解析将会由processMultibulkBuffer()函数执行。
/* 这是processInputBuffer函数的包装,在客户端是master时,负责处理如何向子slave进行复制 */
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else { // 如果客户端是Master,则不仅需要处理缓冲区中的命令,还需要将命令复制到子Slave上
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
void processInputBuffer(client *c) {
server.current_client = c;
/* 只要querybuf中有未解析的数据,我们就会一直处理 */
while(c->qb_pos < sdslen(c->querybuf)) { // 说明至少有一个字节未处理
/* 如果client不是slave且被暂停,则立即返回 */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* 如果client处于被阻塞状态也立即返回 */
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* 如果当前位置以*开头,则RESP格式命令,否则为Inline格式命令 */
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
/* 如果是Inline格式命令,则调用processInlineBuffer处理,该命令格式主要用于telnet
* 如果是RESP格式命令,则调用processMultibulkBuffer处理,该命令格式主要用于一般客户端
* */
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break; // 如果命令未全部读取完成,则返回ERR,下次继续进行处理
} else {
serverPanic("Unknown request type");
}
// 已经读取并解析出一个指令
if (c->argc == 0) { // 当命令参数数量为0,则重置客户端
resetClient(c);
} else { // 否则执行该命令
/* 当命令执行完成时,我们需要使用resetClient清空手动栈状态 */
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* 对于阻塞住在module命令上的client不要重置,这样reply回调可以继续访问argv和argc参数
* 这样的client将会在unblockClientFromModule时重置 */
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
if (server.current_client == NULL) break;
}
}
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
server.current_client = NULL;
}
/* 和processMultibulkBuffer一样,但是对于Inline协议,这个函数将会消费querybuf,
* 并且创建一个准备就绪的命令。
* 如果命令准备就绪,则返回OK,如果需要读取更多字节,则返回ERR。
* 当出现协议错误的时候,将会设置一个错误的error的reply,并关闭连接 */
int processInlineBuffer(client *c) {
char *newline;
int argc, j, linefeed_chars = 1;
sds *argv, aux;
size_t querylen;
/* 查找\n */
newline = strchr(c->querybuf+c->qb_pos,'\n');
/* 如果没找到\n,则什么都不做 */
if (newline == NULL) {
/* 如果超过了64K,则错误 */
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big inline request");
setProtocolError("too big inline request",c);
}
return C_ERR;
}
/* 处理\r\n的情况 */
if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
newline--, linefeed_chars++;
/* 从querybuf中取出命令字符串 */
querylen = newline-(c->querybuf+c->qb_pos);
aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
argv = sdssplitargs(aux,&argc);
sdsfree(aux);
if (argv == NULL) {
addReplyError(c,"Protocol error: unbalanced quotes in request");
setProtocolError("unbalanced quotes in inline request",c);
return C_ERR;
}
/* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big
* RDB file. */
if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = server.unixtime;
/* Move querybuffer position to the next query in the buffer. */
c->qb_pos += querylen+linefeed_chars;
/* 初始化argv数组 */
if (argc) {
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*argc);
}
/* 填充argv参数 */
for (c->argc = 0, j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
return C_OK;
}
/* 处理client的querybuf,设置client命令参数。
* 如果执行完该函数后client的命令处于就绪状态,则返回OK,如果还需要读取更多字节,则返回ERR
* 如果命令协议错误,也会返回ERR,这时client将会被设置一个error的reply,并且关闭连接。
* */
int processMultibulkBuffer(client *c) {
char *newline = NULL;
int ok;
long long ll;
// 如果当前命令未读取的命令块为0,表示即将进行一个新的命令解析
if (c->multibulklen == 0) {
/* 这个客户端命令解析相关的状态应该被重置 */
serverAssertWithInfo(c,NULL,c->argc == 0);
newline = strchr(c->querybuf+c->qb_pos,'\r'); /* 返回第一次出现\r\n的位置的指针 */
if (newline == NULL) { /* 找不到\r\n, 返回错误,表示需要读取更多的数据来查找 */
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError("too big mbulk count string",c);
}
return C_ERR;
}
/* 缓冲区中应该包含\n,不然就继续读取 */
if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
return C_ERR;
/**
* 请求命令:SET mykey myvalue
* RESP格式:*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n
*/
/* 我们确定起码已经读取了*3\r\n,现在我们要解析出3 */
serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); // 断言c->qb_bos指向*
ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); // 将*和\r\n中间的3解析到ll中,成功返回1
if (!ok || ll > 1024*1024) { // 如果解析失败,或者multibulk length过长,错误
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count",c);
return C_ERR;
}
c->qb_pos = (newline-c->querybuf)+2; // c->qb_pos跳过\r\n
if (ll <= 0) return C_OK;
c->multibulklen = ll; // 设置当前命令未读取的块数
/* 初始化argv数组 */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}
// 当前命令的待解析块数不为0,则进行解析剩余的命令块
serverAssertWithInfo(c,NULL,c->multibulklen > 0);
while(c->multibulklen) {
/* 如果命令块长度未知,则需要读取命令块长度 */
if (c->bulklen == -1) {
newline = strchr(c->querybuf+c->qb_pos,'\r'); // 查找下一个\r\n,在$与\r\n之间的就是命令块长度
if (newline == NULL) {
/* 如果块长度太大,则协议错误 */
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,
"Protocol error: too big bulk count string");
setProtocolError("too big bulk count string",c);
return C_ERR;
}
break;
}
/* 应该包含\n */
if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
break;
/* 如果写下来不是$,则协议错误 */
if (c->querybuf[c->qb_pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[c->qb_pos]);
setProtocolError("expected $ but got something else",c);
return C_ERR;
}
// 解析出bulklen
ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError("invalid bulk length",c);
return C_ERR;
}
c->qb_pos = newline-c->querybuf+2; // 跳过\r\n
if (ll >= PROTO_MBULK_BIG_ARG) { // 优化措施,非关键路径
/* 如果我们即将从网络读取一个很大的数据,我们可以优化querybuf空间,
* 来防止大量数据的copy
* 但是仅当querybuf的剩余空间不足以放下ll+2个字节时,我们才进行优化:
* 优化的手段是:偏移量截取和扩容
* */
if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
}
}
c->bulklen = ll;
}
// 当命令块长度获取成功时,需要获取命令块内容
if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
// 此时数据不足,跳出循环,下次处理
break;
} else {
/* 如果当前的buffer仅包含我们的块元素,
* 我们将不会通过创建一个新的object而是使用当前的sds */
if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf);
} else {
// 从当前位置读取块长度的字节,就是命令块内容,写入c->argv数组,内容是一个保存着sds结构的redisObject
c->argv[c->argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->qb_pos += c->bulklen+2; // 跳过\r\n
}
c->bulklen = -1; // 将当前命令块设置为-1,下次循环中将获取新的块长度
c->multibulklen--; // 待读取命令块数递减
}
}
if (c->multibulklen == 0) return C_OK; // 命令全部读取完成,则返回OK
return C_ERR; // 命令未全部读取,则返回ERR
}
- 当解析出一个正确的协议指令时,将会调用processCommand()函数去执行命令。该函数首先将会从server.commands变量中获取对应的redisCommand结构体,其中包含该redis命令的相关信息和回调函数proc。找到之后,如果在事务中,则进行入队操作;如果不在事务中,则将调用call()函数执行该命令。
/* 当整个命令读取完成的时候,将会执行该函数,命令参数及其数量存放在argv和argc字段中 */
/* 当命令合法,操作被执行且客户端仍处于连接状态时返回OK,否则返回ERR */
int processCommand(client *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
// QUIT命令需要特殊处理。
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* 从server.commands字典中查找出命令对象redisCommand,并尽可能的发现错误问题 */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) { // 当没有找到对应的命令时,返回一个错误回复
/* 如果客户端处于事务中,添加CLIENT_DIRTY_EXEC标志位,使得后续的EXEC指令失败 */
flagTransaction(c);
sds args = sdsempty();
int i;
// 错误回复格式:-ERR unknown command 'foobar', with args beginning with: aaa bbb
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
/* 检查参数是否合法:
* 1. 当命令参数固定(arity>0)时,需要客户端发送过来的参数个数==命令参数个数
* 2. 当命令参数不固定(arity<=0)时,需要客户端参数个数>=命令参数个数(总是成立?) todo
* 否则,对事务状态标记添加CLIENT_DIRTY_EXEC标志位,使得后续的EXEC指令失败
* */
flagTransaction(c);
// 发送错误回复: -ERROR wrong number of arguments for 'set' command
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
/* 检查是否进行了登陆验证 */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
/* 如果在事务中,添加失败标记 */
flagTransaction(c);
// 发送错误回复:-NOAUTH Authentication required.
addReply(c,shared.noautherr);
return C_OK;
}
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
/* 如果服务器开启了最大内存限制,处理内存达到最大相关的指令
* 如果可以的话,首先我们尝试释放一些内存(如果存在一些易变的键在数据集中),
* 如果内存回收失败,我们将会返回一个ERR
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the propagation
* of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeeded() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
/* 如果释放更多内存,并且该命令是CMD_DENYOOM,那就返回OOM ERR */
if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
/* 如果上一次执行持久化出错, 并且当前服务器是主服务器,我们不接受写命令 */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (deny_write_type == DISK_ERROR_TYPE_RDB)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
}
/* 如果服务器是一个只读的slave,则只允许master客户端执行写命令 */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
/* 如果客户端处于发布订阅上下文中,则只允许部分命令 */
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}
/* 如果正在loading DB,如果该命令没有CMD_LOADING标志,则返回Loading ERR */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}
/* Lua script too slow? Only allow a limited number of commands. */
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
}
/* 如果当前redisClient处于事务状态,并且命令不是在事务中可以立即执行的命令,则进行入队操作,否则执行客户端命令 */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
- 在call()函数中,将处理有关监视器、慢查询、Lua脚本调用、AOF相关的内容,最重要的将会调用c->cmd->proc(c)函数,proc函数指针指向的是incrCommand()函数,在该函数中最终会将更新后的数据写入到DB中的dict字典中。
/* Call() is the core of Redis execution of a command.
*
* The following flags can be passed:
* CMD_CALL_NONE No flags.
* CMD_CALL_SLOWLOG 检查该命令执行时间,条件达到时写入慢日志
* CMD_CALL_STATS Populate command stats.
* CMD_CALL_PROPAGATE_AOF 如果该命令修改了数据库或者客户端强制,则追加到AOF日志中
* CMD_CALL_PROPAGATE_REPL 如果命令会修改数据库或者客户端设置了FORCE_PROGATION标志则发送到slaves
* CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.
* CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.
*
* 精确的传播行为还取决于客户端标志:
* 1。如果客户端带有CLIENT_FORCE_AOF或者CLIENT_FORCE_REPL,
* 并且调用该函数中时带有CMD_CALL_PROPAGATE_AOF/REPL标志,
* 即使没有修改数据库,也会进行传播到AOF和slaves。
* 2。如果客户端带有CLIENT_PREVENT_REPL_PROP或者CLIENT_PREVENT_AOF_PROP,
* 即使数据库被修改了,也不会传播到AOF和slaves中。
* 注意:不管客户端的标志是什么,
* 如果调用时CMD_CALL_PROPAGATE_AOF或者CMD_CALL_PROPAGATE_REPL,
* AOF和子slaves的传播行为都不会发生。
*
* Client flags are modified by the implementation of a given command
* using the following API:
*
* forceCommandPropagation(client *c, int flags);
* preventCommandPropagation(client *c);
* preventCommandAOF(client *c);
* preventCommandReplication(client *c);
*
*/
void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
/* 当有客户端在监视该服务器时,向监视客户端发送当前处理的命令请求的相关信息
* 在下列情况下不发送:
* 1. 服务器在加载数据库
* 2. 当前命令标志位为CMD_SKIP_MONITOR或CMD_ADMIN管理命令
* */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
/* 初始化:清空相关的标志,这些标志将会被命令按照要求进行设置,并初始化also_propagate数组 */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
/* 执行命令并计算出处理时长 */
dirty = server.dirty;
start = ustime();
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
/* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}
/* 必要时记录慢日志,并且填充每个命令的统计信息在INFO命令状态中 */
/* 如果设置CMD_CALL_SLOWLOG位并且该命令不是EXEC命令,则需要记录慢日志 */
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
/* 传播命令到AOF文件和从服务器 */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/* 检查该命令是否修改了数据集,如果是,则进行传播 */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* 如果客户端设置了强制传播,则也需要进行传播 */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* 然而如果该命令实现了preventCommandPropagation()之类的方法,或者没有调用call()的相关标志,我们不进行传播 */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* 仅当需要进行传播并且该命令不是Module命令时,才进行复制。
* Module命令以一种显式的方式进行复制,所以我们不会进行自动复制。 */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
if (server.also_propagate.numops) {
int j;
redisOp *rop;
if (flags & CMD_CALL_PROPAGATE) {
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
server.stat_numcommands++;
}
- 在incrCommand()函数中处理完DB写入的相关操作后,还会调用signalModifiedKey()处理事务中的乐观锁watch失效事件,在该函数中将会调用touchWatchedKey()函数将所有watch该键的client->flags置CLIENT_DIRTY_CAS位,这样这些client在之后的事务处理中将失败。
void incrCommand(client *c) {
incrDecrCommand(c,1);
}
void incrDecrCommand(client *c, long long incr) {
long long value, oldvalue;
robj *o, *new;
o = lookupKeyWrite(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;
oldvalue = value;
if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
(incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
addReplyError(c,"increment or decrement would overflow");
return;
}
value += incr;
if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
value >= LONG_MIN && value <= LONG_MAX)
{ // 共享对象
new = o;
o->ptr = (void*)((long)value);
} else {
new = createStringObjectFromLongLongForValue(value);
if (o) {
dbOverwrite(c->db,c->argv[1],new);
} else {
dbAdd(c->db,c->argv[1],new);
}
}
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++;
addReply(c,shared.colon);
addReply(c,new);
addReply(c,shared.crlf);
}
/*-----------------------------------------------------------------------------
* 键空间变更的钩子函数
* 每当键被修改时调用signalModifiedKey
* 每当数据库被flush时调用signalFlushedDb
*----------------------------------------------------------------------------*/
/* 每次键被修改时 */
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
}
/* 每次数据库被flush时 */
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
}
/* 将Watch当前key的所有Client结构体的flags中的CLIENT_DIRTY_CAS置位,以使得在执行EXEC时乐观锁失败 */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags |= CLIENT_DIRTY_CAS;
}
}
- 在incrCommand()函数中处理完watch_keys之后,会调用notifyKeyspaceEvent()发送订阅/发布模式的键空间和键事件的通知消息,该消息将会发送给订阅了该键和该模式的客户端。
/* notifyKeyspaceEvent(char *event, robj *key, int dbid);
* 'type' 键空间变更通知分类
* 'event' C字符串表示的事件名,此处为命令,如"zrem","del"
* 'key' redisObject表示的key名
* 'dbid' key所在的DB的id */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
/* 将该事件通知给感兴趣的module,这会绕开notifications的配置,
* 不过如果事件类型匹配,模块引擎也仅仅可以调用事件订阅。 */
moduleNotifyKeyspaceEvent(type, event, key, dbid);
/* 如果当前type分类被关闭,就直接返回 */
if (!(server.notify_keyspace_events & type)) return;
eventobj = createStringObject(event,strlen(event));
/* __keyspace@<db>__:<key> <event> notifications. */
/* 向键空间频道__keyspace@<db>__:<key> 发送 <event> */
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}
/* __keyevent@<db>__:<event> <key> notifications. */
/* 向键事件频道__keyevent@<db>__:<event> 发送 <key> */
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}
- 最后incrCommand()会调用addReply()函数中,addReply()函数首先将client->flags置CLIENT_PENDING_WRITE位,然后将client推入到server.clients_pending_write链表中,接着将要回复的数据写入的静态输出缓冲区c->buf中,如果静态数据回复区满,则写入到动态输出缓冲链表reply_list中。写入成功之后,将会调用asyncCloseClientOnOutputBufferLimitReached()函数检查输出输出缓冲区是否达到软、硬缓冲区限制,如果是,则通过将client->置CLIENT_CLOSE_ASAP位,并推入到server.clients_to_close链表中来关闭客户端。
/* 把robj对象的字符串表示写入到客户端的output buffer */
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyStringToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
/* 当我们每次想往客户端传输数据时,这个函数都会被调用.
*
* 如果客户端(正常的客户端)应该接受新的数据,这个函数将会返回C_OK,并且确保当socket可写时
* 在AE事件循环中安装好命令回复处理器.
*
* 当客户端不应该接收一个数据时,比如fake客户端(比如用来在内存中做AOF的),作为主服务器
* 或者安装命令回复处理器失败时,将会返回C_ERR.
*
* 这个函数在以下两种情况下不会安装命令回复处理器就回复C_OK:
* 1) 当命令回复处理器已经被安装了;
* 2) 当客户端是从服务器但是不在线,我们想要加速写,所以不确保会发送数据.
*
* 典型的,这个函数将会在reply准备好,在写入客户端的回复缓冲区前调用。
* 如果没有数据应该增加,将会返回ERR
*/
int prepareClientToWrite(client *c) {
/* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
/* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
* is set. */
if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
if (c->fd <= 0) return C_ERR; /* AOF加载时的Fake客户端 */
/* 如果当前已经安装了-输出缓冲区中仍然有数据,则不再重复安装 */
if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
}
/* 将该客户端置CLIENT_PENDING_WRITE位,然后将其放入到clients_pending_write队列中。
* 注意:此时并没有真的安装回复处理器,为了减少socket可写事件触发的次数
* 我们将会在事件循环的beforeSleep的handleClientsWithPendingWrites先进行写入socket,
* 如果socket被写满还没有把数据写完,我们才会安装命令回复器
* */
void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* 此处我们用置位和推入链表来代替直直接安装命令回复处理器。
* 在beforeSleep时,我们将会直接写socket,然后写不完,才安装回复处理器。
* */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
/* 将回复写入静态缓冲区c->buf中,成功写入则返回C_OK,否则返回C_ERR */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
/* 如果CLIENT_CLOSE_AFTER_REPLY置位,则说明出现了错误并设置好了reply,不允许再写入 */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
/* 如果当前正在使用动态缓冲区,则不能写静态缓冲区 */
if (listLength(c->reply) > 0) return C_ERR;
/* 如果空间不够用,则不使用静态缓冲区,而使用动态缓冲区 */
if (len > available) return C_ERR;
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
}
/* 将回复写入动态缓冲链表c->reply中 */
void _addReplyStringToList(client *c, const char *s, size_t len) {
// 如果CLIENT_CLOSE_AFTER_REPLY,说明该client出现了错误,并且设置好了error reply
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
listNode *ln = listLast(c->reply);
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
* addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
* fo fill it later, when the size of the bulk length is set. */
/* 如果最后一个Node有剩余空间,则先写满最后一个Node */
if (tail) {
/* 填充满最后一个空间 */
size_t avail = tail->size - tail->used;
size_t copy = avail >= len? len: avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
}
/* 如果还有剩余,则开辟一个新的节点,节点大小为max(16K,len),然后将剩下的数据写入新的节点中 */
if (len) {
/* 创建一个新节点,最少使用PROTO_REPLY_CHUNK_BYTES个字节 */
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
tail = zmalloc(size + sizeof(clientReplyBlock));
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
tail->used = len;
memcpy(tail->buf, s, len);
listAddNodeTail(c->reply, tail);
c->reply_bytes += tail->size;
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
/* 检查客户端是否达到输出缓冲区软、硬限制,如果是,则异步的关闭客户端。
* 调用者可以通过client的CLIENT_CLOSE_ASAP标志检查client是否将会被异步关闭。
* 在下一轮的serverCron中将真正的进行客户端关闭。
* 异步的原因是:该函数可能在调用的时候处于一个不安全的状态,
* 在调用该函数,正在推送数据到output buffer,需要将其关闭时机调整到安全点。
*/
void asyncCloseClientOnOutputBufferLimitReached(client *c) {
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
// 如果软硬内存限制超出了,则进行异步关闭
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
freeClientAsync(c);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
sdsfree(client);
}
}
/* 如果输出缓冲区内存使用达到了硬性限制或者达到了软性限制一定时间,则返回1,否则返回0 */
int checkClientOutputBufferLimits(client *c) {
int soft = 0, hard = 0, class;
/* 获取当前缓冲区使用的总内存大小 */
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
class = getClientType(c);
/* 对于output buffer,master客户端将会被对待作为普通客户端 */
if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
if (server.client_obuf_limits[class].hard_limit_bytes &&
used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
hard = 1;
if (server.client_obuf_limits[class].soft_limit_bytes &&
used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
soft = 1;
/* 当前内存达到了软限制:
* 1. 第一次达到,则记录时间,置0
* 2. 非第一次达到,则计算时间,未超出时间限制则置0
* 当前内存未达到软限制:
* 需要将时间点置0(有可能上次达到,这次恢复)
* */
if (soft) {
if (c->obuf_soft_limit_reached_time == 0) {
c->obuf_soft_limit_reached_time = server.unixtime;
soft = 0; /* 当前是第一次达到了软限制 */
} else {
time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
if (elapsed <=
server.client_obuf_limits[class].soft_limit_seconds) {
soft = 0; /* 没有超过指定时长 */
}
}
} else {
c->obuf_soft_limit_reached_time = 0;
}
return soft || hard;
}
- 在下一次AE事件循环中,将会在进入多路复用的wait之前调用beforeSleep()函数,而在beforeSleep()函数中,将会进行如过期键快速清理,刷新AOF到磁盘等工作,还会调用handleClientsWithPendingWrites()函数。
/* 在AE事件驱动框架中,在调用多路复用的wait函数前会调用该函数。*/
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function. */
if (server.cluster_enabled) clusterBeforeSleep();
/* 如果当前是主,尝试执行一个快速的过期键清理,如果不需要会立即返回 */
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. */
if (server.get_ack_from_slaves) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}
/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients();
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* 刷新AOF buffer中的数据到磁盘上 */
flushAppendOnlyFile(0);
/* 处理等待写出回复的客户端 */
handleClientsWithPendingWrites();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
if (moduleCount()) moduleReleaseGIL();
}
-
在handleClientsWithPendingWrites()函数中,将会遍历server.clients_pending_write,然后调用writeToClient将输出缓冲区的数据直接同步写入到客户端。
-
如果在TCP缓冲区被写满但输出缓冲区中仍有数据,将会安装命令回复处理器sendReplyToClient;如果输出缓冲区中的数据全部写入到TCP缓冲区中,则将会重置缓冲区状态并且卸载命令回复处理器。
/* 该事件在进入事件循环之前被调用,我们希望能够直接写入到客户端而不是通过安装命令回复处理器,
* 这样可以减少syscall次数 */
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* 如果直接写可以将数据写完,则不必安装命令回复处理器,直接处理下一个节点 */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* 如果同步写之后仍有数据残留在缓冲区中,则安装命令回复处理器 */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
- 在之后的AE事件循环中,将会异步地不断将剩下的数据写入TCP缓冲区中,直到输出缓冲区中没有数据,此时,将会重置缓冲区状态并且卸载命令回复处理器。至此,Incr命令算是处理完成了。
/* 命令回复处理器,把数据发送到客户端 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
writeToClient(fd,privdata,1);
}
网友评论