阅读redis代码,就必须试图搞清楚redis的主流程,我们必须用剥洋葱的方法来了解整个代码。redis是个服务器客户端形式的架构,在服务器和客户端下面,就是多路复用IO,为了理解服务器客户端,必须先了解redis使用的多路复用IO,因为这里是redis高效的原因。
redis多路复用IO
redis也是个跨平台代码,同时支持window和linux,本文以linux为准。
五种IO模式
《Unix网络编程》一书中讲tcp/ip总结为5个模式:
- BIO,阻塞IO的方式,阻塞IO时,accept\read等都是阻塞的,这样在tcp服务端和客户端之间连接是有序的,但是同时也造出效率比较低下。
- NIO,非阻塞的,例如read时就不会阻塞,而是立即直接返回AE_ERR,调用者必须重试,直到read成功后进行数据处理。
-多路复用IO,linux采用epoll的方式,每次会讲状态变化的fd,以事件的形式进行通知, - signal driven IO
- AIO,异步IO
redis采用了NIO和多路复用IO。这里这么重要就是因为redis是一个单进程的,但是却拥有非常高的效率,是怎么做到的,所以redis底层的网络设计是必读的。
多路复用IO epoll例程代码
首先需要理解的是多路复用IO,redis linux下使用的是epoll,其原理就是如下epoll例子代码演化的:
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "usage: " << argv[0] << " ip port" << endl;
return -1;
}
char *szIp = argv[1];
in_addr_t iIp = inet_addr(szIp);
if (iIp == INADDR_NONE)
{
cerr << "fail to parse ip: " << szIp << endl;
return -1;
}
char *pEnd = NULL;
uint16_t usPort = strtoul(argv[2], &pEnd, 10);
if (*pEnd != '\0')
{
cerr << "fail to parse port: " << argv[2] << endl;
return -1;
}
int iSockFd = socket(AF_INET, SOCK_STREAM, 0);
if (iSockFd < 0)
{
cerr << "fail to create socket, err: " << strerror(errno) << endl;
return -1;
}
cout << "create socket fd " << iSockFd << endl;
sockaddr_in oAddr;
memset(&oAddr, 0, sizeof(oAddr));
oAddr.sin_family = AF_INET;
oAddr.sin_addr.s_addr = iIp;
oAddr.sin_port = htons(usPort);
if (bind(iSockFd, (sockaddr *)&oAddr, sizeof(oAddr)) < 0)
{
cerr << "fail to bind addr " << szIp << ":" << usPort << ", err: " << strerror(errno) << endl;
return -1;
}
cout << "bind addr " << szIp << ":" << usPort << endl;
if (listen(iSockFd, 100) < 0)
{
cerr << "fail to listen on " << szIp << ":" << usPort << ", err: " << strerror(errno) << endl;
}
cout << "listen on socket fd " << iSockFd << endl;
int iEpollFd = epoll_create(1024);
if (iEpollFd < 0)
{
cerr << "fail to create epoll, err: " << strerror(errno) << endl;
return -1;
}
epoll_event oEvent;
oEvent.events = EPOLLIN;
oEvent.data.fd = iSockFd;
if (epoll_ctl(iEpollFd, EPOLL_CTL_ADD, iSockFd, &oEvent) < 0)
{
cerr << "fail to add listen fd to epoll, err: " << strerror(errno) << endl;
return -1;
}
epoll_event aoEvents[1024];
uint8_t acRecvBuf[1024 * 1024];
while (true)
{
int iFdCnt = epoll_wait(iEpollFd, aoEvents, 1024, -1);
if (iFdCnt < 0)
{
cerr << "epoll wait error, err: " << strerror(errno) << endl;
return -1;
}
for (int i = 0; i < iFdCnt; i++)
{
if (aoEvents[i].data.fd == iSockFd)
{
sockaddr_in oClientAddr;
socklen_t iAddrLen = sizeof(oClientAddr);
int iAcceptFd = accept(iSockFd, (sockaddr *)&oClientAddr, &iAddrLen);
if (iAcceptFd < 0)
{
cerr << "fail to accpet, err: " << strerror(errno) << endl;
continue;
}
cout << "recv connection from " << inet_ntoa(oClientAddr.sin_addr) << ":" << ntohs(oClientAddr.sin_port) << endl;
oEvent.events = EPOLLIN;
oEvent.data.fd = iAcceptFd;
if (epoll_ctl(iEpollFd, EPOLL_CTL_ADD, iAcceptFd, &oEvent) < 0)
{
close(iAcceptFd);
cerr << "fail to add fd to epoll, err: " << strerror(errno) << endl;
continue;
}
}
else
{
int iCurFd = aoEvents[i].data.fd;
ssize_t iRecvLen = recv(iCurFd, acRecvBuf, sizeof(acRecvBuf), 0);
if (iRecvLen < 0)
{
cerr << "fail to recv, close connection, err: " << strerror(errno) << endl;
if (epoll_ctl(iEpollFd, EPOLL_CTL_DEL, iCurFd, NULL) < 0)
{
cerr << "fail to del fd from epoll, err: " << strerror(errno) << endl;
}
close(iCurFd);
continue;
}
if (iRecvLen == 0)
{
cout << "connection closed by client" << endl;
if (epoll_ctl(iEpollFd, EPOLL_CTL_DEL, iCurFd, NULL) < 0)
{
cerr << "fail to del fd from epoll, err: " << strerror(errno) << endl;
}
close(iCurFd);
continue;
}
cout << "recv data len: " << iRecvLen << endl;
ssize_t iSendLen = send(iCurFd, acRecvBuf, iRecvLen, 0);
if (iSendLen < 0)
{
cerr << "fail to send, err: " << strerror(errno) << endl;
if (epoll_ctl(iEpollFd, EPOLL_CTL_DEL, iCurFd, NULL) < 0)
{
cerr << "fail to del fd from epoll, err: " << strerror(errno) << endl;
}
close(iCurFd);
break;
}
cout << "echo to client, len: " << iSendLen << endl;
}
}
}
}
这个例子代码中,可以看到epoll的基本使用。为了进一步学习,有必要了解一下epoll源码上,有一个red-black tree存取fd句柄信息,同时还有一个readylist,readylist中存放了事件信息,当调用epoll_wait的时候,就会把事件返回,处理这些事件就可以了。
把epoll和select、poll对比一下:
- select使用数组为数据结构实现监听变化,每次都要拷贝全部的数组句柄,效率较低,而且数组大小有限。
- poll是对select改进,只不过改用了链表为数据结构,但是问题和select相同。
- epoll则解决了上述问题,只需拷贝变化的事件的句柄,效率较高。
回到redis网络的源码
这里特别要强调学习epoll的原理和例子代码,其中有一个原因,这里对于事件都要注册回调函数,这些回到函数将代码割裂开了,按照剥洋葱的思路,这些割裂开的地方需要识别出来,否则会影响代码的阅读。
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
- 回调serverCron用于:
* - Active expired keys collection (it is also performed in a lazy way on
* lookup).
* - Software watchdog.
* - Update some statistic.
* - Incremental rehashing of the DBs hash tables.
* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
* - Clients timeout of different kinds.
* - Replication reconnection.
* - Many more...
- 回调acceptTcpHandler的作用可以参见epoll例子代码,用于处理客户端连接。
redis网络代码中值得学习的地方
- 对于ipv4和ipv6的同时支持
int listenToPort(int port, int *fds, int *count) {
int j;
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
if (server.bindaddr[j] == NULL) {
int unsupported = 0;
/* Bind * for both IPv6 and IPv4, we enter here only if
* server.bindaddr_count == 0. */
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv6: unsupproted");
}
if (*count == 1 || unsupported) {
/* Bind the IPv4 address as well. */
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv4: unsupproted");
}
}
/* Exit the loop if we were able to bind * on IPv4 and IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
if (*count + unsupported == 2) break;
} else if (strchr(server.bindaddr[j],':')) {
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
} else {
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
}
if (fds[*count] == ANET_ERR) {
serverLog(LL_WARNING,
"Creating Server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
return C_ERR;
}
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return C_OK;
}
static int anetV6Only(char *err, int s) {
int yes = 1;
if (setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,&yes,sizeof(yes)) == -1) {
anetSetError(err, "setsockopt: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
listen to port,ipv4和ipv6可共用一个端口,可创建ipv6后再创建ipv4的连接,而创建ipv6和ipv4共存的关键是anetV6Only中setsockopt的函数参数。这是一个学习要点。
- 设置阻塞非阻塞
int anetSetBlock(char *err, int fd, int non_block) {
int flags;
/* Set the socket blocking (if non_block is zero) or non-blocking.
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
if ((flags = fcntl(fd, F_GETFL)) == -1) {
anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
return ANET_ERR;
}
if (non_block)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1) {
anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
从源码看,是设置了非阻塞参数。
总结
redis网络源码使用了多路复用epoll编码,从epoll的例子代码演化,很容易得到redis封装好的epoll源码,理解好网络代码,可以:
- 了解redis为何高效
- 了解epoll回调,这些回调是割裂代码影响阅读的障碍
同时还需要到了很多网络基础知识,并且:
- redis对于ipv4和ipv6支持很完善,编程技巧值得学习。
- 网络编程中有很多参数,redis做到高性能,一个会用了epoll, reactor模式的框架合计,二是对这些参数了如指掌,需要充分了解这些参数进行网络设计。
学习完了在反过来和netty对比,netty也会用了reactor模式,nio,但是netty帮我们封装好了底层源码,因此不太知道netty底层的实现。由于redis采用的是单进程模式,因此有必要对于netty的底层reactor设计以及netty参数再次学习一遍。
网友评论