美文网首页Redis源码学习笔记
Redis源码学习之事件

Redis源码学习之事件

作者: lixin_karl | 来源:发表于2019-05-17 11:36 被阅读0次

    事件

       Redis服务器是一个事件驱动程序,服务器主要处理时间事件与文件事件。其中,时间事件是指服务器中的一些操作比如(serverCron函数)在给定的时间点执行函数,文件事件就是指服务器通过套接字跟客户端通信,redis中client结构就是套接字的一个封装。

    一、文件事件

       Redis服务器采用了IO多路复用技术,实现高性能的监听。每来一个客户端连接或生成一个aeFileEvent,AE_READABLE表示此事件可读,即客户端发送命令到达服务器。AE_WRITABLE表示此事件可写,即服务器可以发送结果到客户端了。

    二、时间事件

    Redis目前的时间事件就是周期性处理,服务器将所有时间事件放到一个链表中,每次遍历链表,如果达到指定的时间点,那么就去处理事件。

    三、结构与API
    • 结构

      #define AE_NONE 0       /*0 未注册事件 */
      #define AE_READABLE 1   /*01可读. */
      #define AE_WRITABLE 2   /*10 可写. */
      #define AE_BARRIER 4    /*110 对于可写,不要触发事件如果读已经触发了,就是读的时候不要改动*/
      
      #define AE_FILE_EVENTS 1 //文件事件 01
      #define AE_TIME_EVENTS 2 //时间事件 10
      #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //11
      #define AE_DONT_WAIT 4  //100
      #define AE_CALL_AFTER_SLEEP 8 //1000
      /* 文件事件结构 */
      typedef struct aeFileEvent {
          int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
          aeFileProc *rfileProc;//读函数
          aeFileProc *wfileProc;//写函数
          void *clientData;//客户端数据
      } aeFileEvent;
      /* 时间事件结构 */
      typedef struct aeTimeEvent {
          long long id; /* 时间事件标识. */
          long when_sec; /*秒 上一次被访问的时间*/
          long when_ms; /*微秒*/
          aeTimeProc *timeProc;//处理函数
          aeEventFinalizerProc *finalizerProc;//结束处理函数
          void *clientData;//客户端传入的数据
          struct aeTimeEvent *prev;//前一个
          struct aeTimeEvent *next;//后一个 所以是个链表
      } aeTimeEvent;
      typedef struct aeFiredEvent {
          int fd;//描述符
          int mask;//
      } aeFiredEvent;//就绪事件
      typedef struct aeEventLoop {
          int maxfd;   /*目前注册的最大的描述符*/
          int setsize; /*文件描述符的数组大小*/
          long long timeEventNextId;
          time_t lastTime;     /* Used to detect system clock skew */
          aeFileEvent *events; /*注册的事件组*/
          aeFiredEvent *fired; /*触发事件组*/
          aeTimeEvent *timeEventHead;//时间事件头结点
          int stop;//事件的开关
          void *apidata; /* This is used for polling API specific data */
          aeBeforeSleepProc *beforesleep;//休眠之前的处理函数
          aeBeforeSleepProc *aftersleep;//休眠之后的处理函数
      } aeEventLoop;//事件循环的结构体
      
    • API

    aeEventLoop *aeCreateEventLoop(int setsize);//创建并初始化事件循环结构体
    void aeDeleteEventLoop(aeEventLoop *eventLoop);//析构 事件循环结构体
    void aeStop(aeEventLoop *eventLoop);//停止事件循环
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
            aeFileProc *proc, void *clientData);//创建文件事件
    void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);//删除文件事件
    int aeGetFileEvents(aeEventLoop *eventLoop, int fd);//获得文件事件
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
            aeTimeProc *proc, void *clientData,
            aeEventFinalizerProc *finalizerProc);//创建时间事件
    int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);//删除时间事件
    int aeProcessEvents(aeEventLoop *eventLoop, int flags);//处理所有事件
    int aeWait(int fd, int mask, long long milliseconds);//等待
    void aeMain(aeEventLoop *eventLoop);//循环等待处理aeProcessEvents函数
    char *aeGetApiName(void);//返回 "select"
    void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);//设置休眠之前的处理函数
    void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);//设置休眠之后的处理函数
    int aeGetSetSize(aeEventLoop *eventLoop);//get setsize
    int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);//reallcoc fileevent set
    
    • 重要API解析

      1. aeMain是redis服务器进程的循环函数
      void aeMain(aeEventLoop *eventLoop) {
          eventLoop->stop = 0;
          while (!eventLoop->stop) {//死循环
              if (eventLoop->beforesleep != NULL)
                  eventLoop->beforesleep(eventLoop);
              aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
          }
      }
      
      1. aeProcessEvents
            /*处理每个正在进行的时间事件,文件事件。没有特殊标志的函数休眠直到一些文件事件被触发,或者下一个时间事件发生
             *flags=0 啥也不做
             *flags=AE_ALL_EVENTS 处理所有事件
             *flags=AE_FILE_EVENTS 处理所有文件事件
             *flags=AE_TIME_EVENTS 处理所有时间事件
             *flags=AE_DONT_WAIT  返回ASAP 直到所有不需要被等待的可能被处理的事件都被处理
             *flags=AE_CALL_AFTER_SLEEP 调用休眠后函数*/
            int aeProcessEvents(aeEventLoop *eventLoop, int flags)
            {
                int processed = 0, numevents;
                /*如果不是时间事件也不是文件事件返回ASAP */
                if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
                //如果maxfd!=-1表示有事件 或者 需要处理时间事件且不处理不等待事件
                if (eventLoop->maxfd != -1 ||
                    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
                    int j;aeTimeEvent *shortest = NULL;
                    struct timeval tv, *tvp;
                    //如果flags是指需要处理时间事件且不处理不等待事件,寻找时间参数最小的时间事件
                    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                        shortest = aeSearchNearestTimer(eventLoop);
                    if (shortest) {//只要存在时间事件 就会走到这步
                        long now_sec, now_ms;
                        aeGetTime(&now_sec, &now_ms);//得到现在的时间
                        tvp = &tv;
                        /* 下次事件触发需要多少微秒 */
                        long long ms=(shortest->when_sec - now_sec)*1000 +
                            shortest->when_ms - now_ms;
                        if (ms > 0) {
                            tvp->tv_sec = ms/1000;
                            tvp->tv_usec = (ms % 1000)*1000;
                        } else {
                            tvp->tv_sec = 0;
                            tvp->tv_usec = 0;
                        }
                    } else {
                        if (flags & AE_DONT_WAIT) {
                            tv.tv_sec = tv.tv_usec = 0;
                            tvp = &tv;
                        } else {
                            /* Otherwise we can block */
                            tvp = NULL; /* wait forever */
                        }
                    }
                    numevents = aeApiPoll(eventLoop, tvp);//多路复用API 可读 可写结果都出来了
                    /* 调用休眠之后的回调函数*/
                    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
                        eventLoop->aftersleep(eventLoop);
                    for (j = 0; j < numevents; j++) {
                        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                        int mask = eventLoop->fired[j].mask;
                        int fd = eventLoop->fired[j].fd;
                        int fired = 0; /* 当前fd触发的文件事件的个数 */
                        /*
                         * 设置AE_BARRIER      读-写
                         * 未设置AE_BARRIER    写-读
                         * */
                        int invert = fe->mask & AE_BARRIER;
                        //如果没有设置AE_BARRIER 且可读 处理读事件
                        if (!invert && fe->mask & mask & AE_READABLE) {
                            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                            fired++;
                        }
                        //处理写
                        if (fe->mask & mask & AE_WRITABLE) {
                            if (!fired || fe->wfileProc != fe->rfileProc) {
                                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                                fired++;
                            }
                        }
                        //设置了AE_BARRIER 最后处理读
                        if (invert && fe->mask & mask & AE_READABLE) {
                            if (!fired || fe->wfileProc != fe->rfileProc) {
                                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                                fired++;
                            }
                        }
                        processed++;
                    }
                }
                /* 如果需要处理时间事件*/
                if (flags & AE_TIME_EVENTS)
                    processed += processTimeEvents(eventLoop);//处理函数,返回被处理的时间事件的个数
                return processed; /* return the number of processed file/time events */
            }
    
    3. processTimeEvents
    
            /* 处理时间事件*/
            static int processTimeEvents(aeEventLoop *eventLoop) {
                int processed = 0;
                aeTimeEvent *te;
                long long maxId;
                time_t now = time(NULL);
                if (now < eventLoop->lastTime) {//防止系统时间被往前调了,重新设置时间数据
                    te = eventLoop->timeEventHead;
                    while(te) {
                        te->when_sec = 0;
                        te = te->next;
                    }
                }
                eventLoop->lastTime = now;//最近一次被处理时间
                te = eventLoop->timeEventHead;
                maxId = eventLoop->timeEventNextId-1;
                while(te) { //一个个的处理
                    long now_sec, now_ms;
                    long long id;
                    /* id==-1的话 是要被删除的 */
                    if (te->id == AE_DELETED_EVENT_ID) {
                        aeTimeEvent *next = te->next;
                        if (te->prev)
                            te->prev->next = te->next;
                        else
                            eventLoop->timeEventHead = te->next;
                        if (te->next)
                            te->next->prev = te->prev;
                        if (te->finalizerProc)
                            te->finalizerProc(eventLoop, te->clientData);
                        zfree(te);
                        te = next;
                        continue;
                    }
                    //再这次循环中我们不处理刚触发的时间事件
                    if (te->id > maxId) {
                        te = te->next;
                        continue;
                    }
                    aeGetTime(&now_sec, &now_ms);
                    //现在时间大于预设的时间
                    if (now_sec > te->when_sec ||
                        (now_sec == te->when_sec && now_ms >= te->when_ms))
                    {
                        int retval;
                        id = te->id;
                        retval = te->timeProc(eventLoop, id, te->clientData);//时间处理函数
                        processed++;
                        if (retval != AE_NOMORE) {//周期事件
                            aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                        } else {//定时事件 处理完就删除
                                            te->id = AE_DELETED_EVENT_ID;
                        }
                    }
                    te = te->next;
                }
                return processed;
            }
    

    四、参考

    相关文章

      网友评论

        本文标题:Redis源码学习之事件

        本文链接:https://www.haomeiwen.com/subject/vkpnaqtx.html