美文网首页
手写基于epoll与reactor的tcp服务器

手写基于epoll与reactor的tcp服务器

作者: 欢喜树下种西瓜 | 来源:发表于2021-12-30 21:56 被阅读0次

    前言

    此文章记录个人学习epoll网络编程相关的心得

    1. 了解学习epoll如何使用
    2. 了解reactor反应堆模型
    3. 了解各类网络模型
      若能对读者有以上两个方面有所帮助,这将是我的荣幸

    前置知识

    之前我们已经实现过最简单的tcp服务器:https://www.jianshu.com/p/1cab4ef08f33
    但很显然,我们写的tcp服务器太过简单。它只能接入一条连接,而且接入了一条连接后就完全阻塞在连接内,不能新增连接。这样残缺版的服务器是我们不能忍受的。
    为此我们引入IO复用——即引入一个“秘书”来帮忙监听客户端的接入,这样就不需要应用程序阻塞着去监听,大大提升了cpu的使用率。

    多路复用IO模型
    linux系统提供的三种“秘书”分别为——select、poll与epoll
    区别图

    epoll相关函数

    epoll_create

    man 2 epoll_create
    创建一个epoll文件描述符【一颗监听红黑树】

    依赖头文件
    #include <sys/epoll.h>
    
    函数原型

    int epoll_create(int size);

    参数说明

    ● int size
    size:创建的红黑树的监听节点数量【仅供内核参考】

    返回值

    ● 成功 返回一个纸箱新创建的红黑树的根节点fd
    ● 失败 返回-1,并设置errno

    epoll_ctl

    man 2 epoll_ctl

    依赖头文件
    #include <sys/epoll.h>
    
    函数原型

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    参数说明

    ● int epfd
    epoll_create函数的返回值【监听红黑树的根节点的文件描述符】
    ● int op
    对该监听红黑树所做的操作:

    1. EPOLL_CTL_ADD
      添加fd到监听红黑树
    2. EPOLL_CTL_MOD
      修改fd在监听红黑树上的监听事件
    3. EPOLL_CTL_DEL
      将一个fd从监听红黑树上摘下【取消监听/删除一个监听时间】
      ● int fd
      待监听的文件描述符
      ● struct epoll_event *event
      event本质是struct epoll_event 结构体指针【地址】
      注意,epoll_event是一个结构体

    struct epoll_event {
    uint32_t events; /* Epoll events /
    epoll_data_t data; /
    用户数据 */
    };

    这个结构体里的变量:
    uint32_t events:
    EPOLLIN/EPOLLOUT/EPOLLERR
    epoll_data_t data: 联合体epoll_data【这意味着,如果使用了ptr,就不能使用data!使用了data,就不能使用ptr】

    typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    } epoll_data_t;

    1. void *ptr 利用这个指针,可以在epoll反应堆模型中回调函数【当使用了ptr这个指针后,fd参数就不管用了!】
    2. int fd 对应监听事件的fd
    3. uint32_t u32; 一般不用
    4. uint64_t u64; 一般不用
    返回值

    ● 成功 返回0
    ● 失败 返回-1,并设置errno

    epoll_wait函数

    man 2 epoll_wait

    依赖头文件
    #include <sys/epoll.h>
    
    函数原型

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

    参数说明
    • int epfd

    epoll_create函数的返回值【监听红黑树的根节点的文件描述符】

    • struct epoll_event *events

    events 本质是个【数组】

    传出参数。传出满足监听条件的那些fd结构体。

    epoll_wait时的监听树
    • int maxevents

    【events】数组元素的总个数

    e.g

    struct epoll_event events[1024];

    此时 maxevents 就应该设置为1024

    • int timeout

    timeout 表示超时时间:

    -1: 阻塞[一直等待,直到有数据]

    0: 立即返回,非阻塞[检测到为空就立即返回]

    0: 指定超时时间(毫秒)

    返回值
    • 成功

    • 返回>0: 满足监听的总个数,可用作循环上限

    • 返回0: 没有fd满足的监听事件

    • 失败 返回-1,并设置errno

    epoll模型的大致流程

    int lfd = socket();
    bind();
    listen();
    // 创建监听红黑树,OPEN_MAX为最大保持连接数
    int epfd = epoll_create(OPEN_MAX);
    // 将lfd加入监听红黑树中
    // tep中设置好对应的监听属性
    epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &tep);
    while(1){
        // 监听是否有被请求
        epoll_wait(epfd, ep, OPEN_MAX, -1);
        // 遍历处理监听事件
        for() {
            // 处理lfd前台事件
            if(ep[i].data.fd == lfd)
                // 将新得到的cfd加入到监听红黑树中
                epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &tep);
            else{
                Read/Write 处理cfd发来的请求
            } 
        }
    }
    

    LT与ET模式

    LT模式

    水平触发【默认采用模式】,只要有数据都会触发。
    可以理解为,上面的高电平都会触发【高电平意味着有数据】。即,只要是高电平就会触发事件。

    LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
    event.events = EPOLLIN;

    ET模式

    边缘触发,只有数据到来才触发,不管缓存区中是否还有数据。
    可以理解为,上面0到1的电平变动才会触发【高电平意味着有数据】。即,只捕捉上升沿

    ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once).
    event.events = EPOLLIN | EPOLLET;

    比较

    readn调用的阻塞,比如设定读500个字符,但是只读到498,完事儿阻塞了,等另剩下的2个字符,然而在server代码里,一旦read变为readn阻塞了,它就不会被唤醒了,因为epoll_wait因为readn的阻塞不会循环执行,读不到新数据。有点死锁的意思,差俩字符所以阻塞,因为阻塞,读不到新字符。

    Reactor

    Reactor模式是一种典型的事件驱动的编程模型,Reactor逆置了程序处理的流程,其基本的思想即为Hollywood Principle— 'Don't call us, we'll call you'.

    组成: IO多路复用+非阻塞IO
    将对IO的处理转化为对事件的处理

    单reactor

    redis就是这样做的:

    redis示意图

    查阅redis源码:

    封装层次:

    redis源码结构图
    • ae.c 与 ae.h 是封装了 epoll相关的内容。其中,ae_kqueue.c是mac下的

    • anet.h 里 anetTcpServer 函数是用来绑定ip端口的; anetNonBlock 函数是用来设置fd非阻塞的

    • networking.c 里封装了 redis协议和io多线程

    优化思路

    主线程收到数据后,将业务部分丢给线程池处理(主线程就能接着在外面accept,不影响新连接接入了)

    多reactor

    多线程

    多个epoll对象->多个reactor

    reactor与CPU核心数相等

    memcached就是这样做的【一个单独的reactor去接收新连接accept】

    memcached示意图

    多进程

    nginx就是这样做的

    nginx示意图

    nginx使用边缘触发

    redis和memcached都是用水平触发

    使用Reactor模式去封装基于epoll的tcp服务器

    核心思路:
    将fd和对应事件绑定。这样回调时就直接调用对应的回调函数,而无需再次判断fd是否和socketfd相等。—— 反应堆模式——reactor
    变化:
    从前面的划分事件写具体代码,变为划分事件后执行对应的callback:

    for(){
      if(EPOLLIN)   eb->callback();
      if(EPOLLOUT)  eb->callback();
    }
    

    并且将send和recv都设置了对应的EPOLL事件。
    EPOLL属性在recv_cb和send_cb回调函数之间循环变化

    注意事项

    struct epoll_event 结构体中的data,是union!写了data就不能写ptr!

    struct epoll_event
    {
      uint32_t events;  /* Epoll events */
      epoll_data_t data;    /* User data variable */
    } __EPOLL_PACKED;
    // 写了ptr就不能写data 否则报错!
    typedef union epoll_data
    {
      void *ptr;
      int fd;
      uint32_t u32;
      uint64_t u64;
    } epoll_data_t;
    
    // e.g
    struct epoll_event ev;
    ev.ptr = si;
    // ev.data = fd; 上下只能选择其一!否则取值会出现问题
    

    监听lfd[新客户端连接]的应该用EPOLLLT模式[LT]
    而监听旧客户端的接发送的应该用EPOLLET模式[ET]
    在完成这个版本的代码练习后,可以参考libevent/redis源码进行学习
    若我们把 listenfd和clientfd分开处理【不同线程处理】,就能大大提升服务器的介入并发量。
    [如,多个clientfd而只有1个listenfd。那么此时接入的那个listenfd接入速度就会降低]

    demo

    #include <arpa/inet.h>
    #include <errno.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/epoll.h>
    #include <sys/socket.h>
    #include <unistd.h>
    
    #define BUFFER_LENGTH 1024
    
    struct sockitem {
        int sockfd;
        // 回调函数
        int (*callback)(int fd, int events, void *arg);
        // 粘包的情况时临时缓冲数据
        char recvbuffer[BUFFER_LENGTH];
        char sendbuffer[BUFFER_LENGTH];
        // 接收的数据长度
        int rlength;
        // 发送的数据长度
        int slength;
    };
    
    // mainloop
    // 保存全局公用的变量
    struct reactor {
        int epfd;
    
        struct epoll_event events[1024];
    };
    
    struct reactor *eventloop = NULL;
    
    int recv_cb(int fd, int events, void *arg);
    
    int send_cb(int fd, int events, void *arg) {
        struct sockitem *si = (struct sockitem *)arg;
    
        send(fd, si->sendbuffer, si->slength, 0);
    
        struct epoll_event ev;
        // 执行完send之后,再将监听的事件改为EPOLLIN
        ev.events = EPOLLIN | EPOLLET;
        si->sockfd = fd;
        si->callback = recv_cb;
        ev.data.ptr = si;
    
        epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
    }
    
    int recv_cb(int fd, int events, void *arg) {
    
        //int clientfd = events[i].data.fd;
        struct sockitem *si = (struct sockitem *)arg;
        struct epoll_event ev;
    
        int ret = recv(fd, si->recvbuffer, BUFFER_LENGTH, 0);
        if (ret < 0) {
            // 还不是很了解,需多学习
            if (errno == EAGAIN || errno == EWOULDBLOCK) { //
                return -1;
            } else {
            }
    
            ev.events = EPOLLIN;
            epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
    
            close(fd);
    
            free(si);
    
        } else if (ret == 0) {
            printf("disconnect %d\n", fd);
            ev.events = EPOLLIN;
            //ev.data.fd = fd;
            epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
    
            close(fd);
    
            free(si);
    
        } else {
            // 业务在这里处理
            // http
            // websocket
    
            printf("Recv: %s, %d Bytes\n", si->recvbuffer, ret);
            // 接收了啥数据,就回啥数据
            si->rlength = ret;
            memcpy(si->sendbuffer, si->recvbuffer, si->rlength);
            si->slength = si->rlength;
            // 这里直接使用send针对一次buffer能发完的可以,但是一次发不完,就彻底不发了……
            // 利用EPOLLOUT事件就能很优雅地解决这种问题!
            // send(fd, buffer, ret, 0);
    
            struct epoll_event ev;
            // 将监听事件由 EPOLLIN 修改为 EPOLLOUT 事件,以触发send_cb
            ev.events = EPOLLOUT | EPOLLET;
            si->sockfd = fd;
            si->callback = send_cb;
            ev.data.ptr = si;
            // 在监听树里修改成监听EPOLLOUT事件
            epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
        }
    }
    
    int accept_cb(int fd, int events, void *arg) {
        struct sockaddr_in client_addr;
        bzero(&client_addr, sizeof(client_addr));
        socklen_t client_addr_len = sizeof(client_addr);
        int cfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
        if (cfd == -1) {
            perror("accept error!");
            exit(1);
        }
        char str[INET_ADDRSTRLEN] = {0};
        printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr, str, sizeof(str)),
               ntohs(client_addr.sin_port));
    
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
    
        // 设置新的回调函数
        struct sockitem *si = (struct sockitem *)malloc(sizeof(struct sockitem));
        si->sockfd = cfd;
        si->callback = recv_cb;
        // 给监听树里的客户端设置recv_cb回调[消息处理的回调!]
        ev.data.ptr = si;
        // 在监听树里添加客户端!
        epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, cfd, &ev);
        return cfd;
    }
    
    int main(int argc, char *argv[]) {
        if (argc < 2) {
            return -1;
        }
        int port = atoi(argv[1]);
        int sfd = socket(AF_INET, SOCK_STREAM, 0);
        if (sfd == -1) {
            perror("socket error!");
            exit(1);
        }
        struct sockaddr_in addr;
        bzero(&addr, sizeof(addr));
    
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
    
        if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
            perror("bind error!");
            exit(1);
        }
    
        // 服务器
        if (listen(sfd, 5) < 0) {
            perror("listen error!");
            exit(1);
        }
    
        eventloop = (struct reactor *)malloc(sizeof(struct reactor));
    
        struct epoll_event ev;
    
        // epoll opera
        // epoll_create里的参数写1即可->创建一个根节点[红黑树]
        eventloop->epfd = epoll_create(1);
    
        // 将sfd加入到 epoll 红黑树当中
        // 对于创建连接的events应该设置成LT模式!
        ev.events = EPOLLIN;
        // 因为下面使用了ev.data.ptr,故fd将会失效!
        // 因为ev.data是联合体!
        // ev.data.fd = sfd;
    
        struct sockitem *si = (struct sockitem *)malloc(sizeof(struct sockitem));
        si->sockfd = sfd;
        si->callback = accept_cb;
        // 将回调函数的地址带入监听树中
        // 这样能触发事件从而执行回调函数
        ev.data.ptr = si;
    
        epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sfd, &ev);
    
        //
        while (1) {
            // 1024指的是一次epoll_wait最多能获取的事件数量
            // 快递员的袋子的容量
            int nread = epoll_wait(eventloop->epfd, eventloop->events, 1024, -1);
            if (nread == -1) {
                break;
            }
    
            for (int i = 0; i < nread; i++) {
                // 直接将事件分类。连接与断开其实都是EPOLLIN
    
                if (eventloop->events[i].events & EPOLLIN) {
                    printf("EPOLLIN\n");
                    // 上面一大串的代码,可以直接使用回调函数处理掉
                    // 这里我们无需区分是lfd[新客户端]还是cfd[旧客户端发送数据]。让回调去调度不同的回调函数处理
                    struct sockitem *si = (struct sockitem *)eventloop->events[i].data.ptr;
                    si->callback(si->sockfd, eventloop->events[i].events, si);
                }
    
                /*
                    然而这个EPOLLOUT触发条件:
                    1. 缓冲区被写满了,返回EAGAIN(11)
                    2. 对端读取了一些数据,又重新可写了
                */
                if (eventloop->events[i].events & EPOLLOUT) {
                    printf("EPOLLOUT\n");
                    struct sockitem *si = (struct sockitem *)eventloop->events[i].data.ptr;
                    si->callback(si->sockfd, eventloop->events[i].events, si);
                    // int clientfd = events[i].data.fd;
                    // char buf[1024] = "We have send!";
                    // send(clientfd, buf, sizeof(buf), MSG_DONTWAIT);
                }
            }
        }
    }
    

    总结

    引入epoll后,使得程序无需一直等待IO的数据,让“秘书”epoll去帮忙接听电话,而Reactor反应堆则能优化代码逻辑,方便日后扩展。
    这是非常实用的工程经验,许多开源项目,只要涉及了网络相关的内容,都离不开Reator+Epoll的组合。

    技术参考

    1. 视频技术参考 https://ke.qq.com/course/417774?flowToken=1041378

    相关文章

      网友评论

          本文标题:手写基于epoll与reactor的tcp服务器

          本文链接:https://www.haomeiwen.com/subject/hzoiqrtx.html