美文网首页
(5)事件触发EventLoop(Reactor部分)【Lars

(5)事件触发EventLoop(Reactor部分)【Lars

作者: 刘丹冰Aceld | 来源:发表于2019-09-18 11:58 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    4) 事件触发event_loop

    ​ 接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。

    4.1 io_event基于IO事件封装

    ​ 我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.

    lars_reactor/include/event_base.h

    #pragma once
    /*
     * 定义一些IO复用机制或者其他异常触发机制的事件封装
     *
     * */
    
    class event_loop;
    
    //IO事件触发的回调函数
    typedef void io_callback(event_loop *loop, int fd, void *args);
    
    /*
     * 封装一次IO触发实现 
     * */
    struct io_event 
    {
                 io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}
    
        int mask; //EPOLLIN EPOLLOUT
        io_callback *read_callback; //EPOLLIN事件 触发的回调 
        io_callback *write_callback;//EPOLLOUT事件 触发的回调
        void *rcb_args; //read_callback的回调函数参数
        void *wcb_args; //write_callback的回调函数参数
    };
    

    ​ 一个io_event对象应该包含 一个epoll的事件标识EPOLLIN/EPOLLOUT,和对应事件的处理函数read_callback,write_callback。他们都应该是io_callback类型。然后对应的函数形参。

    4.2 event_loop事件循环处理机制

    ​ 接下来我们就要通过event_loop类来实现io_event的基本增删操作,放在原生的epoll堆中。

    lars_reactor/include/event_loop.h

    #pragma once
    /*
     *
     * event_loop事件处理机制
     *
     * */
    #include <sys/epoll.h>
    #include <ext/hash_map>
    #include <ext/hash_set>
    #include "event_base.h"
    
    #define MAXEVENTS 10
    
    // map: fd->io_event 
    typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
    //定义指向上面map类型的迭代器
    typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
    //全部正在监听的fd集合
    typedef __gnu_cxx::hash_set<int> listen_fd_set;
    
    class event_loop 
    {
    public:
        //构造,初始化epoll堆
        event_loop();
    
        //阻塞循环处理事件
        void event_process();
    
        //添加一个io事件到loop中
        void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
    
        //删除一个io事件从loop中
        void del_io_event(int fd);
    
        //删除一个io事件的EPOLLIN/EPOLLOUT
        void del_io_event(int fd, int mask);
        
    private:
        int _epfd; //epoll fd
    
        //当前event_loop 监控的fd和对应事件的关系
        io_event_map _io_evs;
    
        //当前event_loop 一共哪些fd在监听
        listen_fd_set listen_fds;
    
        //一次性最大处理的事件
        struct epoll_event _fired_evs[MAXEVENTS];
    
    };
    

    属性:

    _epfd:是epoll原生堆的fd。

    _io_evs:是一个hash_map对象,主要是方便我们管理fd<—>io_event的对应关系,方便我们来查找和处理。

    _listen_fds:记录目前一共有多少个fd正在本我们的event_loop机制所监控.

    _fried_evs:已经通过epoll_wait返回的被激活需要上层处理的fd集合.

    方法:

    event_loop():构造函数,主要初始化epoll.

    event_process():永久阻塞,等待触发的事件,去调用对应的函数callback方法。

    add_io_event():绑定一个fd和一个io_event的关系,并添加对应的事件到event_loop中。

    del_io_event():从event_loop删除该事件。

    ​ 具体实现方法如下:

    lars_reactor/src/event_loop.cpp

    #include "event_loop.h"
    #include <assert.h>
    
    //构造,初始化epoll堆
    event_loop::event_loop() 
    {
        //flag=0 等价于epll_craete
        _epfd = epoll_create1(0);
        if (_epfd == -1) {
            fprintf(stderr, "epoll_create error\n");
            exit(1);
        }
    }
    
    
    //阻塞循环处理事件
    void event_loop::event_process()
    {
        while (true) {
            io_event_map_it ev_it;
    
            int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
            for (int i = 0; i < nfds; i++) {
                //通过触发的fd找到对应的绑定事件
                ev_it = _io_evs.find(_fired_evs[i].data.fd);
                assert(ev_it != _io_evs.end());
    
                io_event *ev = &(ev_it->second);
    
                if (_fired_evs[i].events & EPOLLIN) {
                    //读事件,掉读回调函数
                    void *args = ev->rcb_args;
                    ev->read_callback(this, _fired_evs[i].data.fd, args);
                }
                else if (_fired_evs[i].events & EPOLLOUT) {
                    //写事件,掉写回调函数
                    void *args = ev->wcb_args; 
                    ev->write_callback(this, _fired_evs[i].data.fd, args);
                }
                else if (_fired_evs[i].events &(EPOLLHUP|EPOLLERR)) {
                    //水平触发未处理,可能会出现HUP事件,正常处理读写,没有则清空
                    if (ev->read_callback != NULL) {
                        void *args = ev->rcb_args;
                        ev->read_callback(this, _fired_evs[i].data.fd, args);
                    }
                    else if (ev->write_callback != NULL) {
                        void *args = ev->wcb_args;
                        ev->write_callback(this, _fired_evs[i].data.fd, args);
                    }
                    else {
                        //删除
                        fprintf(stderr, "fd %d get error, delete it from epoll\n", _fired_evs[i].data.fd);
                        this->del_io_event(_fired_evs[i].data.fd);
                    }
                }
    
            }
        }
    }
    
    /*
     * 这里我们处理的事件机制是
     * 如果EPOLLIN 在mask中, EPOLLOUT就不允许在mask中
     * 如果EPOLLOUT 在mask中, EPOLLIN就不允许在mask中
     * 如果想注册EPOLLIN|EPOLLOUT的事件, 那么就调用add_io_event() 方法两次来注册。
     * */
    
    //添加一个io事件到loop中
    void event_loop::add_io_event(int fd, io_callback *proc, int mask, void *args)
    {
        int final_mask;
        int op;
    
        //1 找到当前fd是否已经有事件
        io_event_map_it it = _io_evs.find(fd);
        if (it == _io_evs.end()) {
            //2 如果没有操作动作就是ADD
            //没有找到
            final_mask = mask;    
            op = EPOLL_CTL_ADD;
        }
        else {
            //3 如果有操作董酒是MOD
            //添加事件标识位
            final_mask = it->second.mask | mask;
            op = EPOLL_CTL_MOD;
        }
    
        //4 注册回调函数
        if (mask & EPOLLIN) {
            //读事件回调函数注册
            _io_evs[fd].read_callback = proc;
            _io_evs[fd].rcb_args = args;
        }
        else if (mask & EPOLLOUT) {
            _io_evs[fd].write_callback = proc;
            _io_evs[fd].wcb_args = args;
        }
        
        //5 epoll_ctl添加到epoll堆里
        _io_evs[fd].mask = final_mask;
        //创建原生epoll事件
        struct epoll_event event;
        event.events = final_mask;
        event.data.fd = fd;
        if (epoll_ctl(_epfd, op, fd, &event) == -1) {
            fprintf(stderr, "epoll ctl %d error\n", fd);
            return;
        }
    
        //6 将fd添加到监听集合中
        listen_fds.insert(fd);
    }
    
    //删除一个io事件从loop中
    void event_loop::del_io_event(int fd)
    {
        //将事件从_io_evs删除
        _io_evs.erase(fd);
    
        //将fd从监听集合中删除
        listen_fds.erase(fd);
    
        //将fd从epoll堆删除
        epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
    }
    
    //删除一个io事件的EPOLLIN/EPOLLOUT
    void event_loop::del_io_event(int fd, int mask)
    {
        //如果没有该事件,直接返回
        io_event_map_it it = _io_evs.find(fd);
        if (it == _io_evs.end()) {
            return ;
        }
    
        int &o_mask = it->second.mask;
        //修正mask
        o_mask = o_mask & (~mask);
        
        if (o_mask == 0) {
            //如果修正之后 mask为0,则删除
            this->del_io_event(fd);
        }
        else {
            //如果修正之后,mask非0,则修改
            struct epoll_event event;
            event.events = o_mask;
            event.data.fd = fd;
            epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event);
        }
    }
    

    ​ 这里del_io_event提供两个重载,一个是直接删除事件,一个是修正事件。

    4.3 Reactor集成event_loop机制

    ​ 好了,那么接下来,就让让Lars Reactor框架集成event_loop机制。

    首先简单修正一个tcp_server.cpp文件,对之前的do_accept()的调度时机做一下修正。

    1. 在`tcp_server`成员新增`event_loop`成员。
    

    lars_reactor/include/tcp_server.h

    #pragma once
    
    #include <netinet/in.h>
    #include "event_loop.h"
    
    
    class tcp_server
    { 
    public: 
        //server的构造函数
        tcp_server(event_loop* loop, const char *ip, uint16_t port); 
    
        //开始提供创建链接服务
        void do_accept();
    
        //链接对象释放的析构
        ~tcp_server();
    
    private: 
        int _sockfd; //套接字
        struct sockaddr_in _connaddr; //客户端链接地址
        socklen_t _addrlen; //客户端链接地址长度
    
        // ============= 新增 ======================
        //event_loop epoll事件机制
        event_loop* _loop;
        // ============= 新增 ======================
    }; 
    
    1. 构造函数在创建完listen fd之后,添加accept事件。

    lars_reactor/src/tcp_server.cpp

    //listen fd 客户端有新链接请求过来的回调函数
    void accept_callback(event_loop *loop, int fd, void *args)
    {
        tcp_server *server = (tcp_server*)args;
        server->do_accept();
    }
    
    //server的构造函数
    tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
    {
        bzero(&_connaddr, sizeof(_connaddr));
        
        //忽略一些信号 SIGHUP, SIGPIPE
        //SIGPIPE:如果客户端关闭,服务端再次write就会产生
        //SIGHUP:如果terminal关闭,会给当前进程发送该信号
        if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
            fprintf(stderr, "signal ignore SIGHUP\n");
        }
        if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
            fprintf(stderr, "signal ignore SIGPIPE\n");
        }
    
        //1. 创建socket
        _sockfd = socket(AF_INET, SOCK_STREAM /*| SOCK_NONBLOCK*/ | SOCK_CLOEXEC, IPPROTO_TCP);
        if (_sockfd == -1) {
            fprintf(stderr, "tcp_server::socket()\n");
            exit(1);
        }
    
        //2 初始化地址
        struct sockaddr_in server_addr;
        bzero(&server_addr, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        inet_aton(ip, &server_addr.sin_addr);
        server_addr.sin_port = htons(port);
    
        //2-1可以多次监听,设置REUSE属性
        int op = 1;
        if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &op, sizeof(op)) < 0) {
            fprintf(stderr, "setsocketopt SO_REUSEADDR\n");
        }
    
        //3 绑定端口
        if (bind(_sockfd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
            fprintf(stderr, "bind error\n");
            exit(1);
        }
    
        //4 监听ip端口
        if (listen(_sockfd, 500) == -1) {
            fprintf(stderr, "listen error\n");
            exit(1);
        }
    
        // ============= 新增 ======================
        //5 将_sockfd添加到event_loop中
        _loop = loop;
    
        //6 注册_socket读事件-->accept处理
        _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
        // ============= 新增 ======================
    }
    
    1. 修改do_accept()方法

    lars_reactor/src/tcp_server.cpp

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <strings.h>
    
    #include <unistd.h>
    #include <signal.h>
    #include <sys/types.h>          /* See NOTES */
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <errno.h>
    
    #include "tcp_server.h"
    #include "reactor_buf.h"
    
    //临时的收发消息
    struct message{
        char data[m4K];
        char len;
    };
    struct message msg;
    
    void server_rd_callback(event_loop *loop, int fd, void *args);
    void server_wt_callback(event_loop *loop, int fd, void *args);
    
    
    //...省略其他代码
    //...省略其他代码
    
    //server read_callback
    void server_rd_callback(event_loop *loop, int fd, void *args)
    {
        int ret = 0;
    
        struct message *msg = (struct message*)args;
        input_buf ibuf;
    
        ret = ibuf.read_data(fd);
        if (ret == -1) {
            fprintf(stderr, "ibuf read_data error\n");
            //删除事件
            loop->del_io_event(fd);
            
            //对端关闭
            close(fd);
    
            return;
        }
        if (ret == 0) {
            //删除事件
            loop->del_io_event(fd);
            
            //对端关闭
            close(fd);
            return ;
        }
    
        printf("ibuf.length() = %d\n", ibuf.length());
        
        //将读到的数据放在msg中
        msg->len = ibuf.length();
        bzero(msg->data, msg->len);
        memcpy(msg->data, ibuf.data(), msg->len);
    
        ibuf.pop(msg->len);
        ibuf.adjust();
    
        printf("recv data = %s\n", msg->data);
    
        
        //删除读事件,添加写事件
        loop->del_io_event(fd, EPOLLIN);
        loop->add_io_event(fd, server_wt_callback, EPOLLOUT, msg);
    }
    
    //server write_callback
    void server_wt_callback(event_loop *loop, int fd, void *args)
    {
        struct message *msg = (struct message*)args;
        output_buf obuf;
    
        //回显数据
        obuf.send_data(msg->data, msg->len);
        while(obuf.length()) {
            int write_ret = obuf.write2fd(fd);
            if (write_ret == -1) {
                fprintf(stderr, "write connfd error\n");
                return;
            }
            else if(write_ret == 0) {
                //不是错误,表示此时不可写
                break;
            }
        }
    
        //删除写事件,添加读事件
        loop->del_io_event(fd, EPOLLOUT);
        loop->add_io_event(fd, server_rd_callback, EPOLLIN, msg);
    }
    
    //...省略其他代码
    //...省略其他代码
    
    //开始提供创建链接服务
    void tcp_server::do_accept()
    {
        int connfd;    
        while(true) {
            //accept与客户端创建链接
            printf("begin accept\n");
            connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
            if (connfd == -1) {
                if (errno == EINTR) {
                    fprintf(stderr, "accept errno=EINTR\n");
                    continue;
                }
                else if (errno == EMFILE) {
                    //建立链接过多,资源不够
                    fprintf(stderr, "accept errno=EMFILE\n");
                }
                else if (errno == EAGAIN) {
                    fprintf(stderr, "accept errno=EAGAIN\n");
                    break;
                }
                else {
                    fprintf(stderr, "accept error");
                    exit(1);
                }
            }
            else {
                //accept succ!
                // ============= 新增 ======================
                this->_loop->add_io_event(connfd, server_rd_callback, EPOLLIN, &msg);
                break;
                // ============= 新增 ======================
            }
        }
    }
    
    //...省略其他代码
    //...省略其他代码
    

    4.4 完成Lars Reactor V0.3开发

    ​ 我们将lars_reactor/example/lars_reactor_0.2的代码复制一份到 lars_reactor/example/lars_reactor_0.3中。

    lars_reactor/example/lars_reactor_0.3/lars_reactor.cpp

    #include "tcp_server.h"
    
    int main() 
    {
        event_loop loop;
        
        tcp_server server(&loop, "127.0.0.1", 7777);
    
        loop.event_process();
    
        return 0;
    }
    

    编译。

    启动服务器

    $ ./lars_reactor 
    

    分别启动2个客户端

    client1

    $ nc 127.0.0.1 7777
    hello Iam client1
    hello Iam client1  回显
    

    client2

    $ nc 127.0.0.1 7777
    hello Iam client2
    hello Iam client2  回显
    

    服务端打印

    $ ./lars_reactor 
    begin accept
    ibuf.length() = 18
    recv data = hello Iam client1
    
    begin accept
    ibuf.length() = 18
    recv data = hello Iam client2
    

    目前我们已经成功将event_loop机制加入到reactor中了,接下来继续添加功能。


    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(5)事件触发EventLoop(Reactor部分)【Lars

          本文链接:https://www.haomeiwen.com/subject/emnhuctx.html