客户端定义
redis使用了一对多服务器程序:一个服务器可以与多个客户端进行网络连接,向多个客户端提供服务。服务端为客户端建立的相应的客户端结构来保存客户端的相关信息,位于文件redis.h
中。
typedef struct redisClient {
uint64_t id; /* Client incremental unique ID. */
//客户端的套接字,伪客户端的为-1(AOF,Lua脚本),其它为普通客户端
int fd;
//客户端正在使用的redis数据库指针
redisDb *db;
int dictid;
//客户端的名字,使用client setname 来设置名称
robj *name; /* As set by CLIENT SETNAME */
//输入缓存区
sds querybuf;
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
//将querybuf中解析完成后,放入argv中,个数放于argc中
int argc;
robj **argv;
//redis 命令的实现函数
struct redisCommand *cmd, *lastcmd;
int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */
//回复缓冲区,用来保存回复比较大的内容,使用链表来保存
list *reply;
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
//创建客户端的时间
time_t ctime; /* Client creation time */
//最后一次交互时间,可以用来计算该连接的空转时间
time_t lastinteraction; /* time of the last interaction, used for timeout */
//缓冲区第一次达到软性限制的时间
time_t obuf_soft_limit_reached_time;
//redis标志位
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
//身份验证,0表示未通过身份验证,1表示通过身份验证。仅在服务端启用了安全验证时才用,若启用
//未通过验证,则只能使用AUTH命令,其它均无法使用
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
/* Response buffer */
//输出缓冲区,固定大小的大小,用来保存长度比较小的回复
int bufpos;
//默认大小为16k
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
服务端接受客户端连接
void initServer(void) {
...
//创建evenloop
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
...
//创建socket监听套接字
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
...
//增加网络套接字的监听,收到可读事件后,接受连接
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
...
}
//接受tcp连接
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while(max--) {
//接受连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
//根据fd生成客户端结构
acceptCommonHandler(cfd,0);
}
}
//接受本地套接字的连接
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd, max = MAX_ACCEPTS_PER_CALL;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while(max--) {
cfd = anetUnixAccept(server.neterr, fd);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
}
}
//根据连接的文件描述符创建一个客户端
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
//如果超过最大的客户端个数,则向客户端发送错误原因,同时关闭该连接
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
网友评论