美文网首页
redis网络源码

redis网络源码

作者: Wu杰语 | 来源:发表于2018-12-30 22:34 被阅读0次

    阅读redis代码,就必须试图搞清楚redis的主流程,我们必须用剥洋葱的方法来了解整个代码。redis是个服务器客户端形式的架构,在服务器和客户端下面,就是多路复用IO,为了理解服务器客户端,必须先了解redis使用的多路复用IO,因为这里是redis高效的原因。

    redis多路复用IO

    redis也是个跨平台代码,同时支持window和linux,本文以linux为准。

    五种IO模式

    《Unix网络编程》一书中讲tcp/ip总结为5个模式:

    • BIO,阻塞IO的方式,阻塞IO时,accept\read等都是阻塞的,这样在tcp服务端和客户端之间连接是有序的,但是同时也造出效率比较低下。
    • NIO,非阻塞的,例如read时就不会阻塞,而是立即直接返回AE_ERR,调用者必须重试,直到read成功后进行数据处理。
      -多路复用IO,linux采用epoll的方式,每次会讲状态变化的fd,以事件的形式进行通知,
    • signal driven IO
    • AIO,异步IO

    redis采用了NIO和多路复用IO。这里这么重要就是因为redis是一个单进程的,但是却拥有非常高的效率,是怎么做到的,所以redis底层的网络设计是必读的。

    多路复用IO epoll例程代码

    首先需要理解的是多路复用IO,redis linux下使用的是epoll,其原理就是如下epoll例子代码演化的:

    #include <errno.h>
    #include <string.h>
    #include <stdlib.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <iostream>
    #include <sys/epoll.h>
    
    using namespace std;
    
    int main(int argc, char *argv[])
    {
        if (argc != 3)
        {
            cout << "usage: " << argv[0] << " ip port" << endl;
            return -1;
        }
    
        char *szIp = argv[1];
        in_addr_t iIp = inet_addr(szIp);
        if (iIp == INADDR_NONE)
        {
            cerr << "fail to parse ip: " << szIp << endl;
            return -1;
        }
        char *pEnd = NULL;
        uint16_t usPort = strtoul(argv[2], &pEnd, 10);
        if (*pEnd != '\0')
        {
            cerr << "fail to parse port: " << argv[2] << endl;
            return -1;
        }
    
        int iSockFd = socket(AF_INET, SOCK_STREAM, 0);
        if (iSockFd < 0)
        {
            cerr << "fail to create socket, err: " << strerror(errno) << endl;
            return -1;
        }
        cout << "create socket fd " << iSockFd << endl;
    
        sockaddr_in oAddr;
        memset(&oAddr, 0, sizeof(oAddr));
        oAddr.sin_family = AF_INET;
        oAddr.sin_addr.s_addr = iIp;
        oAddr.sin_port = htons(usPort);
        if (bind(iSockFd, (sockaddr *)&oAddr, sizeof(oAddr)) < 0)
        {
            cerr << "fail to bind addr " << szIp << ":" << usPort << ", err: " << strerror(errno) << endl;
            return -1;
        }
        cout << "bind addr " << szIp << ":" << usPort << endl;
    
        if (listen(iSockFd, 100) < 0)
        {
            cerr << "fail to listen on " << szIp << ":" << usPort << ", err: " << strerror(errno) << endl;
        }
        cout << "listen on socket fd " << iSockFd << endl;
        
        int iEpollFd = epoll_create(1024);
        if (iEpollFd < 0)
        {
            cerr << "fail to create epoll, err: " << strerror(errno) << endl;
            return -1;
        }
    
        epoll_event oEvent;
        oEvent.events = EPOLLIN;
        oEvent.data.fd = iSockFd;
        if (epoll_ctl(iEpollFd, EPOLL_CTL_ADD, iSockFd, &oEvent) < 0)
        {
            cerr << "fail to add listen fd to epoll, err: " << strerror(errno) << endl;
            return -1;
        }
    
        epoll_event aoEvents[1024];
        uint8_t acRecvBuf[1024 * 1024];
        while (true)
        {
            int iFdCnt = epoll_wait(iEpollFd, aoEvents, 1024, -1);
            if (iFdCnt < 0)
            {
                cerr << "epoll wait error, err: " << strerror(errno) << endl;
                return -1;
            }
    
            for (int i = 0; i < iFdCnt; i++)
            {
                if (aoEvents[i].data.fd == iSockFd)
                {
                    sockaddr_in oClientAddr;
                    socklen_t iAddrLen = sizeof(oClientAddr);
                    int iAcceptFd = accept(iSockFd, (sockaddr *)&oClientAddr, &iAddrLen);
                    if (iAcceptFd < 0)
                    {
                        cerr << "fail to accpet, err: " << strerror(errno) << endl;
                        continue;
                    }
                    cout << "recv connection from " << inet_ntoa(oClientAddr.sin_addr) << ":" << ntohs(oClientAddr.sin_port) << endl;
    
                    oEvent.events = EPOLLIN;
                    oEvent.data.fd = iAcceptFd;
                    if (epoll_ctl(iEpollFd, EPOLL_CTL_ADD, iAcceptFd, &oEvent) < 0)
                    {
                        close(iAcceptFd);
                        cerr << "fail to add fd to epoll, err: " << strerror(errno) << endl;
                        continue;
                    }
                }
                else
                {
                    int iCurFd = aoEvents[i].data.fd;
                    ssize_t iRecvLen = recv(iCurFd, acRecvBuf, sizeof(acRecvBuf), 0);
                    if (iRecvLen < 0)
                    {
                        cerr << "fail to recv, close connection, err: " << strerror(errno) << endl;
                        if (epoll_ctl(iEpollFd, EPOLL_CTL_DEL, iCurFd, NULL) < 0)
                        {
                            cerr << "fail to del fd from epoll, err: " << strerror(errno) << endl;
                        }
                        close(iCurFd);
                        continue;
                    }
                    if (iRecvLen == 0)
                    {
                        cout << "connection closed by client" << endl;
                        if (epoll_ctl(iEpollFd, EPOLL_CTL_DEL, iCurFd, NULL) < 0)
                        {
                            cerr << "fail to del fd from epoll, err: " << strerror(errno) << endl;
                        }
                        close(iCurFd);
                        continue;
                    }
                    cout << "recv data len: " << iRecvLen << endl;
    
                    ssize_t iSendLen = send(iCurFd, acRecvBuf, iRecvLen, 0);
                    if (iSendLen < 0)
                    {
                        cerr << "fail to send, err: " << strerror(errno) << endl;
                        if (epoll_ctl(iEpollFd, EPOLL_CTL_DEL, iCurFd, NULL) < 0)
                        {
                            cerr << "fail to del fd from epoll, err: " << strerror(errno) << endl;
                        }
                        close(iCurFd);
                        break;
                    }
                    cout << "echo to client, len: " << iSendLen << endl;
                }
            }
        }
    }
    

    这个例子代码中,可以看到epoll的基本使用。为了进一步学习,有必要了解一下epoll源码上,有一个red-black tree存取fd句柄信息,同时还有一个readylist,readylist中存放了事件信息,当调用epoll_wait的时候,就会把事件返回,处理这些事件就可以了。

    把epoll和select、poll对比一下:

    • select使用数组为数据结构实现监听变化,每次都要拷贝全部的数组句柄,效率较低,而且数组大小有限。
    • poll是对select改进,只不过改用了链表为数据结构,但是问题和select相同。
    • epoll则解决了上述问题,只需拷贝变化的事件的句柄,效率较高。

    回到redis网络的源码

    这里特别要强调学习epoll的原理和例子代码,其中有一个原因,这里对于事件都要注册回调函数,这些回到函数将代码割裂开了,按照剥洋葱的思路,这些割裂开的地方需要识别出来,否则会影响代码的阅读。

        /* Create the timer callback, this is our way to process many background
         * operations incrementally, like clients timeout, eviction of unaccessed
         * expired keys and so forth. */
        if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            serverPanic("Can't create event loop timers.");
            exit(1);
        }
    
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
    
    
    • 回调serverCron用于:
     * - Active expired keys collection (it is also performed in a lazy way on
     *   lookup).
     * - Software watchdog.
     * - Update some statistic.
     * - Incremental rehashing of the DBs hash tables.
     * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
     * - Clients timeout of different kinds.
     * - Replication reconnection.
     * - Many more...
    
    • 回调acceptTcpHandler的作用可以参见epoll例子代码,用于处理客户端连接。

    redis网络代码中值得学习的地方

    • 对于ipv4和ipv6的同时支持
    int listenToPort(int port, int *fds, int *count) {
        int j;
    
        /* Force binding of 0.0.0.0 if no bind address is specified, always
         * entering the loop if j == 0. */
        if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
        for (j = 0; j < server.bindaddr_count || j == 0; j++) {
            if (server.bindaddr[j] == NULL) {
                int unsupported = 0;
                /* Bind * for both IPv6 and IPv4, we enter here only if
                 * server.bindaddr_count == 0. */
                fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                    server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                    serverLog(LL_WARNING,"Not listening to IPv6: unsupproted");
                }
    
                if (*count == 1 || unsupported) {
                    /* Bind the IPv4 address as well. */
                    fds[*count] = anetTcpServer(server.neterr,port,NULL,
                        server.tcp_backlog);
                    if (fds[*count] != ANET_ERR) {
                        anetNonBlock(NULL,fds[*count]);
                        (*count)++;
                    } else if (errno == EAFNOSUPPORT) {
                        unsupported++;
                        serverLog(LL_WARNING,"Not listening to IPv4: unsupproted");
                    }
                }
                /* Exit the loop if we were able to bind * on IPv4 and IPv6,
                 * otherwise fds[*count] will be ANET_ERR and we'll print an
                 * error and return to the caller with an error. */
                if (*count + unsupported == 2) break;
            } else if (strchr(server.bindaddr[j],':')) {
                /* Bind IPv6 address. */
                fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                    server.tcp_backlog);
            } else {
                /* Bind IPv4 address. */
                fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                    server.tcp_backlog);
            }
            if (fds[*count] == ANET_ERR) {
                serverLog(LL_WARNING,
                    "Creating Server TCP listening socket %s:%d: %s",
                    server.bindaddr[j] ? server.bindaddr[j] : "*",
                    port, server.neterr);
                return C_ERR;
            }
            anetNonBlock(NULL,fds[*count]);
            (*count)++;
        }
        return C_OK;
    }
    
    
    static int anetV6Only(char *err, int s) {
        int yes = 1;
        if (setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,&yes,sizeof(yes)) == -1) {
            anetSetError(err, "setsockopt: %s", strerror(errno));
            close(s);
            return ANET_ERR;
        }
        return ANET_OK;
    }
    

    listen to port,ipv4和ipv6可共用一个端口,可创建ipv6后再创建ipv4的连接,而创建ipv6和ipv4共存的关键是anetV6Only中setsockopt的函数参数。这是一个学习要点。

    • 设置阻塞非阻塞
    int anetSetBlock(char *err, int fd, int non_block) {
        int flags;
    
        /* Set the socket blocking (if non_block is zero) or non-blocking.
         * Note that fcntl(2) for F_GETFL and F_SETFL can't be
         * interrupted by a signal. */
        if ((flags = fcntl(fd, F_GETFL)) == -1) {
            anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
            return ANET_ERR;
        }
    
        if (non_block)
            flags |= O_NONBLOCK;
        else
            flags &= ~O_NONBLOCK;
    
        if (fcntl(fd, F_SETFL, flags) == -1) {
            anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
            return ANET_ERR;
        }
        return ANET_OK;
    }
    

    从源码看,是设置了非阻塞参数。

    总结

    redis网络源码使用了多路复用epoll编码,从epoll的例子代码演化,很容易得到redis封装好的epoll源码,理解好网络代码,可以:

    • 了解redis为何高效
    • 了解epoll回调,这些回调是割裂代码影响阅读的障碍

    同时还需要到了很多网络基础知识,并且:

    • redis对于ipv4和ipv6支持很完善,编程技巧值得学习。
    • 网络编程中有很多参数,redis做到高性能,一个会用了epoll, reactor模式的框架合计,二是对这些参数了如指掌,需要充分了解这些参数进行网络设计。

    学习完了在反过来和netty对比,netty也会用了reactor模式,nio,但是netty帮我们封装好了底层源码,因此不太知道netty底层的实现。由于redis采用的是单进程模式,因此有必要对于netty的底层reactor设计以及netty参数再次学习一遍。

    相关文章

      网友评论

          本文标题:redis网络源码

          本文链接:https://www.haomeiwen.com/subject/pqdblqtx.html