美文网首页
Reactor 模式

Reactor 模式

作者: wayyyy | 来源:发表于2021-09-13 00:31 被阅读0次

Reactor 模型就是网络服务器端用来处理高并发网络 IO 请求的一种编程模型,总结起来:

  • 三类事件,即 连接事件,写事件,读事件
  • 三个角色,即 reactor、acceptor、handler
事件类型与角色

Reactor 模型处理的是客户端和服务器端的交互过程,而这三类事件正好对应了客户端和服务器端交互过程中,不同类请求在服务器端引发的待处理事件:

  • 当一个客户端要和服务器端进行交互时,客户端会向服务器端发送连接请求,以建立连接,这就对应了服务器端的一个连接事件。
  • 一旦连接建立后,客户端会给服务器端发送读请求,以便读取数据。服务器端在处理读请求时,需要向客户端写回数据,这对应了服务器端的写事件。
  • 无论客户端给服务器端发送读或写请求,服务器端都需要从客户端读取请求内容,所以在这里,读或写请求的读取就对应了服务器端的读事件。

这三类事件是由谁来处理的呢?那就是对应三个角色了:

  • 连接事件由 acceptor 来处理,负责接收连接;acceptor 在接收连接后,会创建 handler,用于网络连接上对后续读写事件的处理;
  • 读写事件由 handler 处理;
  • 最后,在高并发场景中,连接事件、读写事件会同时发生,所以,我们需要有一个角色专门监听和分配事件,这就是 reactor 角色。当有连接请求时,reactor 将产生的连接事件交由 acceptor 处理;当有读写请求时,reactor 将读写事件交由 handler 处理。
7304940-3d3bb7900292b66b.png
事件驱动框架

事件驱动框架,就是在实现 Reactor 模型时,需要实现的代码整体控制逻辑。简单来说,事件驱动框架包括了两部分:一是事件初始化;二是事件捕获、分发和处理主循环。

事件初始化是在服务器程序启动时就执行的,它的作用主要是创建需要监听的事件类型,以及该类事件对应的 handler。而一旦服务器完成初始化后,事件初始化也就相应完成了,服务器程序就需要进入到事件捕获、分发和处理的主循环中。

7304940-df0cf0ead00658d2.png
Redis 对 Reactor 模型的实现

github 有人将redis中的网络模块代码移植出来,参见:https://github.com/aisk/libae

Redis 为了实现事件驱动框架,同样的定义了:
事件的数据结构
框架主循环函数
事件捕获分发函数
事件注册函数
事件对应的handler 函数

  • 事件的数据结构
    Redis 的事件驱动框架定义了两类事件:IO 事件和时间事件,分别对应了客户端发送的网络请求和 Redis 自身的周期性操作

    #define AE_NONE 0
    #define AE_READABLE 1 
    #define AE_WRITABLE 2 
    #define AE_BARRIER 4  
    
    typedef struct aeFileEvent {
      int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
      aeFileProc *rfileProc;
      aeFileProc *wfileProc;
      void *clientData;
    } aeFileEvent;
    
  • 框架主循环函数

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            …
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
  • 事件捕获与分发

    
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* 若没有事件处理,则立刻返回 */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) 
            return 0;
    
        /* 如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
        if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
         …
        }
    
        /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
        /* 返回已经处理的文件或时间*/
        return processed; 
    }
    

    主要来看看第二种情况,首先,当该情况发生时,Redis 需要捕获发生的网络事件,并进行相应的处理。在这种情况下,aeApiPoll 函数会被调用,用来捕获事件,如下所示:

     int aeProcessEvents(aeEventLoop *eventLoop, int flags){
         ...
         if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
             ...
             // 调用aeApiPoll函数捕获事件
             numevents = aeApiPoll(eventLoop, tvp);
             ...
          }
          ...
    }
    

    Redis 依赖于操作系统底层提供的 IO 多路复用机制,来实现事件捕获,检查是否有新的连接、读写事件发生。为了适配不同的操作系统,Redis 对不同操作系统实现的网络 IO 多路复用函数,都进行了统一的封装,封装后的代码分别通过以下四个文件中实现:

    • ae_epoll.c,对应 Linux 上的 IO 复用函数 epoll
    • ae_evport.c,对应 Solaris 上的 IO 复用函数 evport
    • ae_kqueue.c,对应 macOS 或 FreeBSD 上的 IO 复用函数 kqueue
    • ae_select.c,对应 Linux(或 Windows)的 IO 复用函数 select

    这里以epoll 为例,在 aeApiPoll 函数中直接调用了 epoll_wait 函数,并将 epoll 返回的事件信息保存起来的逻辑

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        …
        // 调用epoll_wait获取监听到的事件
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
              tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
            // 获得监听到的事件数量
            numevents = retval;
            // 针对每一个事件,进行处理
            for (j = 0; j < numevents; j++) {
               // 保存事件信息
            }
        }
        return numevents;
    }
    
  • 事件注册
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                          aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize)
        {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
    
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE)
            fe->rfileProc = proc;
        if (mask & AE_WRITABLE)
            fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
    
        return AE_OK;
    }
    
  • 事件处理函数

参考资料

1、《高性能Linux服务器编程》
2、https://github.com/aisk/libae

相关文章

网友评论

      本文标题:Reactor 模式

      本文链接:https://www.haomeiwen.com/subject/viibdltx.html