Redis服务器入口
server.c中的main()
IO多路复用函数
在ae.c
中,Redis会根据当前系统选择最佳IO多路复用函数:
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
获取当前时间(微秒)
struct timeval tv;
gettimeofday(&tv, NULL);
获取当前时间(秒和毫秒)
void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;
gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec / 1000;
}
int main()
{
long now_sec, now_ms;
aeGetTime(&now_ms, &now_ms);
return 0;
}
在当前时间上加上一定的时间
/**
* aeAddMillisecondsToNow - 在当前时间上加上一定的时间(毫秒)
* @milliseconds:加上一定的时间(毫秒)
* @sec:在当前时间上加上一定的时间(毫秒)后的时间(秒)
* @ms:在当前时间上加上一定的时间(毫秒)后的时间(毫秒)
* @return:void
* */
void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms)
{
long cur_sec, cur_ms, when_sec, when_ms;
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds / 1000;
when_ms = cur_ms + milliseconds % 1000;
if (when_ms >= 1000)
{
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
设置文件描述符为阻塞/非阻塞模式
/**
* anetSetBlock - 设置文件描述符为阻塞/非阻塞模式
* @fd:文件描述符
* @non_block:0为设置阻塞模式,非0为设置非阻塞模式
* @return:成功返回ANET_OK,失败返回ANET_ERR
* */
int anetSetBlock(int fd, int non_block)
{
int flags;
if (-1 == (flags = fcntl(fd, F_GETFL)))
{
ERR_EXIT("fcntl(F_GETFL)");
return ANET_ERR;
}
if (non_block)
{
flags |= O_NONBLOCK;
}
else
{
flags &= ~O_NONBLOCK;
}
if (-1 == fcntl(fd, F_SETFL, flags))
{
ERR_EXIT("fcntl(F_SETFL, O_NONBLOCK)");
return ANET_ERR;
}
return ANET_OK;
}
在循环的条件中通常会加入循环退出条件
eventLoop->stop = 0;
while (!eventLoop->stop)
{
// TODO:添加代码
}
setsockopt
TCP_NODELAY
static int anetSetTcpNoDelay(char *err, int fd, int val)
{
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1)
{
perror("setsockopt(TCP_NODELAY)");
return ANET_ERR;
}
return ANET_OK;
}
SO_REUSEADDR
int anetSetReuseAddr(int fd)
{
int yes = 1;
if (-1 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
{
perror("setsockopt(SO_REUSEADDR)");
return ANET_ERR;
}
return ANET_OK;
}
SO_REUSEPORT
int anetSetReusePort(int fd)
{
int yes = 1;
if (-1 == setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
{
perror("setsockopt(SO_REUSEPORT)");
return ANET_ERR;
}
return ANET_OK;
}
SO_SNDTIMEO
int anetSendTimeout(int fd, long long ms)
{
struct timeval tv;
tv.tv_sec = ms / 1000;
tv.tv_usec = (ms % 1000) * 1000;
if (-1 == setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)))
{
perror("setsockopt(SO_SNDTIMEO)");
return ANET_ERR;
}
return ANET_OK;
}
SO_KEEPALIVE
int anetTcpKeepAlive(int fd)
{
int yes = 1;
if (-1 == setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)))
{
perror("setsockopt(SO_KEEPALIVE)");
return ANET_ERR;
}
return ANET_OK;
}
readn函数封装
/**
* anetRead - 从文件描述符中读取指定长度数据到缓冲区中
* @fd:文件描述符
* @buf:缓冲区
* @count:要读取的数据长度
* @return:-1表示读取失败,==count表示读取了指定的长度,<count表示提前遇到了EOF
* */
int anetRead(int fd, char *buf, int count)
{
ssize_t nread, totlen = 0;
while(totlen != count)
{
nread = read(fd, buf, count - totlen);
if (0 == nread)
{
return totlen;
}
if (-1 == nread)
{
return -1;
}
totlen += nread;
buf += nread;
}
return totlen;
}
writen函数的封装
/**
* anetWrite - 从缓冲区中写指定长度数据到文件描述符中
* @fd:文件描述符
* @buf:缓冲区
* @count:要写的数据长度
* @return:-1表示写失败,==count表示写了指定的长度,<count表示缓冲区中的内容提前被写完了
* */
int anetWrite(int fd, char *buf, int count)
{
ssize_t nwritten, totlen = 0;
while(totlen != count)
{
nwritten = write(fd, buf, count - totlen);
if (0 == nwritten)
{
return totlen;
}
if (-1 == nwritten)
{
return -1;
}
totlen += nwritten;
buf += nwritten;
}
return totlen;
}
gethostbyname()和gethostbyaddr()不可重入的原因
gethostbyname()
和gethostbyaddr
会返回指向静态数据的指针,而指针所指向的静态数据可能会被下一次的调用覆写。
The functions gethostbyname()
and gethostbyaddr()
may return pointers to static data, which may be overwritten by later calls.
使用getaddrinfo代替gethostbyname
/**
* anetGenericResolve - 解析域名为IP地址
* @host:域名
* @ipbuf:缓冲区,用于存放解析域名后得到的点分十进制IP地址
* @ipbuf_len:缓冲区大小
* */
int anetGenericResolve(char *host, char *ipbuf, size_t ipbuf_len)
{
struct addrinfo hints, *info;
int rv;
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; /* specify socktype to avoid dups */
if (0 != (rv = getaddrinfo(host, NULL, &hints, &info)))
{
fprintf(stderr, "%s", gai_strerror(rv));
return ANET_ERR;
}
if (info->ai_family == AF_INET)
{
struct sockaddr_in *sa = (struct sockaddr_in *)info->ai_addr;
inet_ntop(AF_INET, &(sa->sin_addr), ipbuf, ipbuf_len);
}
else
{
struct sockaddr_in6 *sa = (struct sockaddr_in6 *)info->ai_addr;
inet_ntop(AF_INET6, &(sa->sin6_addr), ipbuf, ipbuf_len);
}
freeaddrinfo(info);
return ANET_OK;
}
TCP客户端
/**
* anetTcpGenericConnect - 创建一个TCP客户端,并连接到TCP服务器
* @addr:要连接的IP地址,既可以是点分十进制IP地址,也可以是域名
* @port:要连接的端口号
* @source_addr:要绑定的IP地址,如果为NULL,则在connect时由内核自动绑定一个可用地址
* @return:成功返回连接成功的套接字(非阻塞),失败返回-1
* */
int anetTcpGenericConnect(char *addr, int port, char *source_addr)
{
int s = ANET_ERR, rv;
char portstr[6]; /* strlen("65535") + 1; */
struct addrinfo hints, *servinfo, *bservinfo, *p, *b;
snprintf(portstr,sizeof(portstr),"%d",port);
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if (0 != (rv = getaddrinfo(addr, portstr, &hints, &servinfo)))
{
fprintf(stderr, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next)
{
/* Try to create the socket and to connect it.
* If we fail in the socket() call, or on connect(), we retry with
* the next entry in servinfo. */
if (-1 == (s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)))
{
continue;
}
if (anetSetReuseAddr(s) == ANET_ERR)
{
goto error;
}
if (ANET_OK != anetSetBlock(s, 1))
{
goto error;
}
if (source_addr)
{
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
if (0 != (rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)))
{
fprintf(stderr, "%s", gai_strerror(rv));
goto error;
}
for (b = bservinfo; b != NULL; b = b->ai_next)
{
if (-1 == bind(s,b->ai_addr,b->ai_addrlen))
{
bound = 1;
break;
}
}
freeaddrinfo(bservinfo);
if (!bound)
{
fprintf(stderr, "bind: %s\n", strerror(errno));
goto error;
}
}
if (-1 == connect(s,p->ai_addr,p->ai_addrlen))
{
/* If the socket is non-blocking, it is ok for connect() to
* return an EINPROGRESS error here. */
if (errno == EINPROGRESS)
{
goto end;
}
close(s);
s = ANET_ERR;
continue;
}
/* If we ended an iteration of the for loop without errors, we
* have a connected socket. Let's return to the caller. */
goto end;
}
if (p == NULL)
{
fprintf(stderr, "creating socket: %s\n", strerror(errno));
}
error:
if (s != ANET_ERR)
{
close(s);
s = ANET_ERR;
}
end:
freeaddrinfo(servinfo);
/* Handle best effort binding: if a binding address was used, but it is
* not possible to create a socket, try again without a binding address. */
if (s == ANET_ERR && source_addr)
{
return anetTcpGenericConnect(addr, port, NULL);
}
else
{
return s;
}
}
TCP服务器
int anetListen(int s, struct sockaddr *sa, socklen_t len, int backlog)
{
if (-1 == bind(s, sa, len))
{
fprintf(stderr, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
if (-1 == listen(s, backlog))
{
fprintf(stderr, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
/**
* _anetTcpServer - 创建一个正在监听的TCP服务器
* @port:TCP服务器要绑定的端口号
* @bindaddr:TCP服务器要绑定的IP地址,既可以是点分十进制格式,也可以是域名
* @af:AF_INET/AF_INET6
* @backlog:listen函数的第二个参数
* @return:失败返回-1,成功返回监听套接字
* */
int _anetTcpServer(int port, char *bindaddr, int af, int backlog)
{
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port, 6, "%d", port);
memset(&hints, 0, sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
if (0 != (rv = getaddrinfo(bindaddr, _port, &hints, &servinfo)))
{
fprintf(stderr, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next)
{
if (-1 == (s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)))
{
continue;
}
if (ANET_ERR == anetSetReuseAddr(s))
{
goto error;
}
if (ANET_ERR == anetListen(s, p->ai_addr, p->ai_addrlen, backlog))
{
s = ANET_ERR;
}
goto end;
}
if (NULL == p)
{
fprintf(stderr, "unable to bind socket, errno: %d", errno);
goto error;
}
error:
if (-1 != s)
{
close(s);
}
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
使用inet_ntop()/inet_pton()代替inet_ntoa()/inet_aton()
inet_ntoa()
和inet_aton()
只能用于IPv4,而inet_ntop()
和inet_pton()
既可以用于IPv4,又可以用于IPv6。
sockaddr_in addr;
bzero(&addr, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_ANY);
char ipv4[16] = {0};
inet_ntop(AF_INET, &addr.sin_addr, ipv4, sizeof(ipv4));
printf("ipv4 = %s\n", ipv4); // 0.0.0.0
snprintf(ipv4, sizeof(ipv4), "127.0.0.0");
inet_pton(AF_INET, ipv4, &addr.sin_addr);
printf("addr.sin_addr.s_addr = %d\n", addr.sin_addr.s_addr); // 127
点分十进制IP地址的本质
点分十进制IP地址"127.1.2.3"中的"127"对应于4字节整型IP地址的第1个字节,"1"对应于第2个字节,"2"对应于第3个字节,"3"对应于第4个字节。即,字符串数组的低字节对应于整数的低字节。
在封装一个函数时,通常实现两个,一个负责提供调用接口,一个负责具体实现(函数名以_开头)
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}
strcasecmp与strncasecmp
即忽略大小写版的strcmp()
与strncmp()
。
strdup
strdup()
函数返回一个指针,该指针指向复制字符串的副本,存放复制字符串副本的内存是由malloc()
分配的,使用完毕后需要使用free()
释放。
以守护进程方式运行
void daemonize(void)
{
int fd;
if (fork() != 0)
{
exit(0); /* parent exits */
}
setsid(); /* create a new session */
/* Every output goes to /dev/null. If Redis is daemonized but
* the 'logfile' is set to 'stdout' in the configuration file
* it will not log at all. */
if (-1 != (fd = open("/dev/null", O_RDWR, 0)))
{
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO)
{
close(fd);
}
}
}
实现查看应用程序的版本号或者帮助
// 注意:执行完version()函数后,应该退出进程,因为客户只是查看应用程序版本,而不是要执行应用程序
void version()
{
printf("Redis server version %s (%s:%d)\n", REDIS_VERSION,
redisGitSHA1(), atoi(redisGitDirty()) > 0);
exit(0);
}
// 注意:在usage()函数中,打印信息应该输出到标准错误中,且该函数执行完毕后,应该退出进程,且返回1,表示发生了某种错误
void usage()
{
fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
fprintf(stderr," ./redis-server - (read config from stdin)\n");
exit(1);
}
int main(int argc, char **argv) {
// 如果是2个参数,且第2个参数为-v、--version或者--help,则打印对应的信息
if (2 == argc)
{
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0)
{
version();
}
if (strcmp(argv[1], "--help") == 0)
{
usage();
}
}
// 如果大于2个参数,则打印使用方法
else if ((argc > 2))
{
usage();
}
// 如果只有1个参数,则打印警告信息
else
{
redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
}
}
测试函数
int __failed_tests = 0;
int __test_num = 0;
#define test_cond(descr,_c) do { \
__test_num++; printf("%d - %s: ", __test_num, descr); \
if(_c) printf("PASSED\n"); else {printf("FAILED\n"); __failed_tests++;} \
} while(0)
#define test_report() do { \
printf("%d tests, %d passed, %d failed\n", __test_num, \
__test_num-__failed_tests, __failed_tests); \
if (__failed_tests) { \
printf("=== WARNING === We have failed tests here...\n"); \
} \
} while(0)
test_cond("Create a string and obtain the length",
sdslen(x) == 3 && memcmp(x,"foo\0",4) == 0);
test_report();
网友评论