美文网首页
实战Linux I/O多路复用:借助epoll,单线程高效管理1

实战Linux I/O多路复用:借助epoll,单线程高效管理1

作者: 拂去尘世尘 | 来源:发表于2024-05-09 06:01 被阅读0次

    [TOC]

    引言

      在应对高并发连接的传统策略中,普遍采取为每个连接配置单独线程或进程的直接方式,管理其I/O操作。此法虽直观易行,但随业务规模扩张,线程资源需求急剧上升。相反,Linux下的I/O多路复用技术,尤其是epoll,展示了一种高效路径:单一线程即可监控成千上万的文件描述符,极大提升了资源使用效率。
      I/O 多路复用的场景有很多,也比较实用。通常用法epoll线程 + 线程/协程池处理并发场景,这里做一个简单的实例使用,以便后续查阅。

    概述

    selectpoll同样能够满足多路复用的需求,在特定场景下各有千秋。不过,当面对需监控大量文件句柄的场景时,epoll凭借其高效的设计和更高的性能表现,成为更为优选的解决方案。其不仅在资源管理和事件处理上展现出明显优势,而且编程接口的灵活性也更为优雅。本文主要聚焦于epoll的实践应用,实例学习其高效而精炼的使用方法。

    epoll常用接口

    epoll的描述man手册已经记录比较详细了,这里列举一下常用的接口:

    1. epoll_create / epoll_create1
    • 原型: int epoll_create(int size) / int epoll_create1(int flags)
    • 功能: 创建一个新的epoll实例,返回一个文件描述符,该描述符代表epoll对象。
    • 参数:
      • size: 接受一个参数 size,在Linux 2.6.8以后这个参数被忽略,但仍要求传递一个大于0的值;
      • flags: 接收一个标志。为0作用与epoll_create相同;为EPOLL_CLOEXEC时,会在execve() 调用后自动关闭 epoll 文件描述符,避免子进程继承。
    • 返回值
      • -1:发生错误,设置errno> 0:epoll文件描述符。
    1. epoll_ctl
    • 原型: int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
    • 功能: 用于控制已经创建好的epoll实例中的文件描述符事件集合。
    • 参数:
      • epfd:epoll_create() 返回的文件描述符。
      • op:操作类型,可以是 EPOLL_CTL_ADD(添加)、EPOLL_CTL_MOD(修改)、EPOLL_CTL_DEL(删除)。
      • fd:要操作的文件描述符。
      • event:一个指向struct epoll_event的指针,定义了关注的事件类型(如 EPOLLIN, EPOLLOUT)及其它数据。
    • 返回值
      • -1:发生错误,设置errno0:成功。
    1. epoll_wait
    • 原型: int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
    • 功能: 阻塞等待直到epoll实例中的一个或多个文件描述符变为就绪状态(可读、可写或出现错误)。
    • 参数:
      • epfd:epoll实例的文件描述符。
      • events:指向struct epoll_event结构体数组的指针,用于存储就绪事件。
      • maxevents:events 数组的最大容量。
      • timeout:等待超时时间,单位为毫秒,-1表示无限等待,0 表示立即返回,正值为等待的最长时间。
    • 返回值:
      • -1:发生错误,设置errno0:超时;>0: 准备好的文件描述符数量。

    应用场景

      在高并发TCP服务场景中,服务端通过部署epoll + 线程/协程池机制,构建高效服务框架。epoll作为核心监听器,统一管理并快速响应来自不同客户端的连接请求,其事件驱动特性确保了对socket就绪状态的即时检测。与此同时,这些请求被异步地分发至线程/协程池中,利用任务队列和工作线程(或轻量级协程)并发执行,提升数据处理能力。

    类图

    EpollEventHandler类图
    • EpollEventHandler (Epoll事件调度器类)
      该类负责注册并管理监听句柄,实时监控Epoll事件,确保对每个就绪连接的快速响应与处理。
    • IEpollEvent (监听接口类)
      此类定义了句柄注册与事件处理的标准操作,使EpollEventHandler能统一管理不同类型的监听对象,实现接口的标准化与句柄处理的灵活性。
    • PSocket (可被监听的Socket实现类)
      继承自IEpollEvent的实现类,封装标准的Socket操作,同时定义针对Epoll事件的响应逻辑,实现Socket交互的统一管理和定制化处理。
    • PUart (可被监听的Uart实现类)
      继承自IEpollEvent的实现类,封装了标准Uart操作,同时定义针对Epoll事件的响应逻辑,实现Uart交互的统一管理和定制化处理。
    • 其他可被监听的实现类
      还可以实现其他可被epoll监听的类型类,通过继承IEpollEvent实现可被EpollEventHandler统一注册,再通过内部EpollEvent实现差异化响应处理。

    源码实现

    编程环境

    ① 编译环境: Linux环境
    ② 语言: C++语言

    接口定义

    • EpollEventHandler
    class EpollEventHandler
    {
    public:
        virtual ~EpollEventHandler();
        static EpollEventHandler* GetInstance();
    
        void AddPoll(IEpollEvent* p);
        void DelPoll(IEpollEvent* p);
        void EpollLoop(bool bRun);
    
    private:
        EpollEventHandler(int size = 0);
    
    private:
        int     mHandle;
        bool    mRun;
        std::map<int, IEpollEvent*> mEpollMap;   // fd, type, IEpollEvent
    };
    

    EpollEventHandler主要封装了epoll接口,集中管理并监听所有IEpollEvent实例。在EpollLoop循环中,阻塞等待并处理各类句柄事件,一旦事件触发,即通过多态调用IEpollEvent的虚函数来EpollEvent执行特定的事件处理逻辑,从而实现差异化的处理需求。

    void EpollEventHandler::EpollLoop(bool bRun)
    {
        struct epoll_event ep[32];
    
        mRun = bRun;
        do {
            if (!mRun) {
                break;
            }
    
            // 无事件时, epoll_wait阻塞, 等待
            int count = epoll_wait(mHandle, ep, sizeof(ep)/sizeof(ep[0]), -1);
            if (count <= 0) {
                continue;
            }
    
            for (int i = 0; i < count; i++) {
                IEpollEvent* p = (IEpollEvent*)ep[i].data.ptr;
                if (p == nullptr) {
                    continue;
                }
    
                // TODO: 丢到线程/协程池响应
                p->EpollEvent(p->GetEpollFd(), p->GetEpollType(), p->GetArgs());
            }
        } while(mRun);
    
        SPR_LOGD("EpollLoop exit\n");
    }
    
    • IEpollEvent
    class IEpollEvent
    {
    public:
        IEpollEvent(int fd, EpollType eType = EPOLL_TYPE_BEGIN, void* arg = nullptr)
            : mEpollFd(fd), mEpollType(eType), mArgs(arg) {};
    
        virtual ~IEpollEvent() = default;
        virtual ssize_t Write(int fd, const std::string& bytes);
        virtual ssize_t Read(int fd, std::string& bytes);
        virtual void*   EpollEvent(int fd, EpollType eType, void* arg) = 0;
    
        int         GetEpollFd()        { return mEpollFd; }
        EpollType   GetEpollType()      { return mEpollType; }
        void*       GetArgs()           { return mArgs; }
    
    protected:
        int         mEpollFd;
        EpollType   mEpollType;
        void*       mArgs;
    };
    

    IEpollEvent主要统一句柄注册与事件处理的标准操作,方便EpollEventHandler统一监听,通过EpollEvent实现差异化响应。

    • PSocket
    class PSocket : public IEpollEvent
    {
    public:
        PSocket(int domain, int type, int protocol,
                   std::function<void(int, void*)> cb, void* arg = nullptr);
    
        PSocket(int sock,
                   std::function<void(int, void*)> cb, void* arg = nullptr);
    
        virtual ~PSocket();
    
        void Close();
        int AsTcpServer(short bindPort, int backlog);
        int AsTcpClient(bool con = false,
                        const std::string& srvAddr = "",
                        short srvPort = 0,
                        int rcvLen = 512 * 1024,
                        int sndLen = 512 * 1024);
    
        int AsUdpServer(short bindPort, int rcvLen = 512 * 1024);
        int AsUdpClient(const std::string& srvAddr, short srvPort, int sndLen = 512 * 1024);
    
        int AsUnixStreamServer(const std::string& serverName, int backlog);
        int AsUnixStreamClient(bool con = false,
                               const std::string& serverName = "",
                               const std::string& clientName = "");
    
        int AsUnixDgramServer(const std::string& serverName);
        int AsUnixDgramClient(const std::string& serverName);
    
        virtual void*   EpollEvent(int fd, EpollType eType, void* arg) override;
    
    private:
        bool            mEnable;
        PSocketType     mSockType;
        std::function<void(int, void*)> mCb;
    };
    
    • PUart
    class PUart : public IEpollEvent
    {
    public:
        PUart(const std::string& devPath,
                std::function<void(int, char *, long, void*)> cb,
                void*   arg     = nullptr,
                speed_t rate    = B115200,
                int     parity  = 0,
                int     stopbit = 1
                );
        virtual ~PUart();
    
    
        void* EpollEvent(int fd, EpollType eType, void* arg) override;
    
        bool  SetupPort(speed_t rate, int parity, int stopbit);
        void  Close();
    
    private:
        std::function<void(int, char *, long, void*)> mCb;
        std::string mDevFile;
    };
    

    测试效果

    • 测试代码
      这里实现一个TCP server的功能,响应多个客户端请求。
    int main(int argc, const char *argv[])
    {
        std::mutex epFdMutex;
        EpollEventHandler *pEpoll = EpollEventHandler::GetInstance();
        auto tcpClient = make_shared<PSocket>(AF_INET, SOCK_STREAM, 0, [&](int sock, void *arg) {
            PSocket* pCliObj = (PSocket*)arg;
            if (pCliObj == nullptr) {
                SPR_LOGE("PSocket is nullptr\n");
                return;
            }
    
            std::string rBuf;
            int rc = pCliObj->Read(sock, rBuf);
            if (rc > 0) {
                SPR_LOGD("# RECV [%d]> %s\n", sock, rBuf.c_str());
            } else {
                pEpoll->DelPoll(pCliObj);
                SPR_LOGD("## CLOSE [%d]\n", sock);
    
                std::lock_guard<std::mutex> lock(epFdMutex);
                pCliObj->Close();
            }
        });
    
        tcpClient->AsTcpClient(true, "127.0.0.1", 8080);
        pEpoll->AddPoll(tcpClient.get());
    
        std::thread wThread([&]{
            while(true) {
                std::lock_guard<std::mutex> lock(epFdMutex);
                tcpClient->Write(tcpClient->GetEpollFd(), "Hello World");
                sleep(1);
            }
        });
    
        pEpoll->EpollLoop(true);
        wThread.join();
        return 0;
    }
    
    • 测试结果
    $ ./sample_tcpserver
      81 EpollEvent D: Add epoll fd 4
      81 EpollEvent D: Add epoll fd 5
      81 EpollEvent D: Add epoll fd 6
      54 TcpServer D: # RECV [6]> I'm Client A
      58 TcpServer D: # SEND [6]> ACK
      54 TcpServer D: # RECV [5]> I'm Client B
      58 TcpServer D: # SEND [5]> ACK
      54 TcpServer D: # RECV [6]> I'm Client A
      58 TcpServer D: # SEND [6]> ACK
      54 TcpServer D: # RECV [5]> I'm Client B
      58 TcpServer D: # SEND [5]> ACK
    

    测试结果上看,sample_tcpserver能够实现一个线程同时监听两个客户端的请求和应答。

    总结

    • 本篇主要操练一下epoll的常规使用,简单做一下封装能够实现epoll监听各个类型的句柄事件。其实epoll还可以监听消息队列、串口等其他文件句柄,深入挖掘一下,能够实现很多优雅的操作。
    • 本实践深受先前一位导师兼朋友所分享代码的启发,其创新性地提出了采用epoll结合协程机制来替代传统多线程架构的方法,让我受益匪浅。
    • epoll的妙用远不止于此,后续的代码会不断挖掘,并集成到个人的开源项目中。

    相关文章

      网友评论

          本文标题:实战Linux I/O多路复用:借助epoll,单线程高效管理1

          本文链接:https://www.haomeiwen.com/subject/znijfjtx.html