美文网首页
kqueue in libevent

kqueue in libevent

作者: 混世太保 | 来源:发表于2020-03-30 21:36 被阅读0次

    [TOC]
    kqueue 是 FreeBSD 上的一种的多路复用机制。它是针对传统的 select/poll 处理大量的文件描述符性能较低效而开发出来的。注册一批描述符到 kqueue 以后,当其中的描述符状态发生变化时,kqueue 将一次性通知应用程序哪些描述符可读、可写或出错了。

    kqueue知识点说明

    kqueue体系只有三样东西:struct kevent结构体,EV_SET宏以及kevent函数。即 kqueue()、kevent() 两个系统调用和 struct kevent 结构。

    kevent

    struct kevent { 
       uintptr_t ident;    /* 事件 ID */ 
       short   filter;    /* 事件过滤器 */ 
        /*
        添加到kqueue中:EV_ADD, 从kqueue中删除:EV_DELETE, 这两种是主要的行为
        一次性事件:EV_ONESHOT, 此事件是或操作, 指定了该事件, kevent()返回后, 事件会从kqueue中删除
        更新事件: EV_CLEAR,此事件是或操作, 手册上的解释是,当事件通知给用户后,事件的状态会被重置。可以用在类似      于epoll的ET模式,也可以用在描述符有时会出错的情况。
        其他事件: EOF事件:EV_EOF, 错误事件:EV_ERROR(返回值)
        */
       u_short  flags;    /* 行为标识 */ 
        
       u_int   fflags;    /* 过滤器标识值 */ 
       intptr_t data;     /* 过滤器数据 */ 
       void   *udata;    /* 应用透传数据 */ 
     }; 
    在一个 kqueue 中,{ident, filter} 确定一个唯一的事件。
    

    kevent函数

    /* kevent 提供向内核注册反注册事件和返回就绪事件或错误事件: 
    kq: kqueue 的文件描述符。 
    changelist: 要注册 / 反注册的事件数组; 
    nchanges: changelist 的元素个数。 
    eventlist: 满足条件的通知事件数组; 
    nevents: eventlist 的元素个数。 
    timeout: 等待事件到来时的超时时间,0,立刻返回;NULL,一直等待;有一个具体值,等待 timespec 时间值。 返回值:可用事件的个数。*/
    int kevent(int kq, const struct kevent *changelist, int nchanges, 
            struct kevent *eventlist, int nevents, 
            const struct timespec *timeout);
    
    kqueue的参考文档

    APUE:KQUEUE / FreeBSD

    IBM kqueue

    kqueue.c

    struct kqop {
        struct kevent *changes;//有事件发生的队列
        int nchanges;//发生事情的个数。这样可以遍历事件队列
        struct kevent *events;//监控队列
        int nevents;//监控队列长度
        int kq;// kq队列
    } kqop;
    

    所有的方法

    void *kq_init   (void);
    int kq_add  (void *, struct event *);
    int kq_del  (void *, struct event *);
    int kq_recalc   (void *, int);
    int kq_dispatch (void *, struct timeval *); 
    

    将该类注册为eventop的一个实例。

    struct eventop kqops = {
        "kqueue",
        kq_init,
        kq_add,
        kq_del,
        kq_recalc,
        kq_dispatch
    };
    

    kq_init

    1. 先检查环节变量,是否设置为kqueue可用。
    2. 初始化一个kqueue的队列。
    3. 初始化kevent的事件队列。
    void *
    kq_init(void)
    {
        int kq;
    
        /* Disable kqueue when this environment variable is set */
        if (getenv("EVENT_NOKQUEUE"))
            return (NULL);
    
        memset(&kqop, 0, sizeof(kqop));
    
        /* Initalize the kernel queue */
        
        if ((kq = kqueue()) == -1) {
            log_error("kqueue");
            return (NULL);
        }
    
        kqop.kq = kq;
    
        /* Initalize fields */
        kqop.changes = malloc(NEVENT * sizeof(struct kevent));
        if (kqop.changes == NULL)
            return (NULL);
        kqop.events = malloc(NEVENT * sizeof(struct kevent));
        if (kqop.events == NULL) {
            free (kqop.changes);
            return (NULL);
        }
        kqop.nevents = NEVENT;
    
        return (&kqop);
    }
    

    kq_recal

    这个类,不像select。发生一次事件后,需要重新注册事件。所以直接返回0。

    int
    kq_recalc(void *arg, int max)
    {
        return (0);
    }
    

    kq_add

    int
    kq_add(void *arg, struct event *ev)
    {
        struct kqop *kqop = arg;//获取this指针。
        struct kevent kev;//待添加事件
    
        //检查是否为信号事件。
        if (ev->ev_events & EV_SIGNAL) {
            int nsignal = EVENT_SIGNAL(ev);//获取句柄
    
            memset(&kev, 0, sizeof(kev));
            kev.ident = nsignal;//identifier for this event
            kev.filter = EVFILT_SIGNAL;// 有很多,EVFILT_READ,EVFILT_WRITE,EVFILT_TIMER等
            kev.flags = EV_ADD;//指定事件操作类型,比如EV_ADD,EV_ENABLE, EV_DELETE等 
            if (!(ev->ev_events & EV_PERSIST))
                kev.flags |= EV_ONESHOT;
            kev.udata = ev;//将ev当做参数了。
            
            if (kq_insert(kqop, &kev) == -1)
                return (-1);
            //注册信号行为。该信号函数是个空事件。可能留着以后用吧。
            if (signal(nsignal, kq_sighandler) == SIG_ERR)
                return (-1);
    
            ev->ev_flags |= EVLIST_X_KQINKERNEL;//给libevent的event的ev_flags添加
            return (0);
        }
    
        if (ev->ev_events & EV_READ) {
            memset(&kev, 0, sizeof(kev));
            kev.ident = ev->ev_fd;
            kev.filter = EVFILT_READ;
            kev.flags = EV_ADD;
            if (!(ev->ev_events & EV_PERSIST))
                kev.flags |= EV_ONESHOT;
            kev.udata = ev;
            
            if (kq_insert(kqop, &kev) == -1)
                return (-1);
    
            ev->ev_flags |= EVLIST_X_KQINKERNEL;
        }
    
        if (ev->ev_events & EV_WRITE) {
            memset(&kev, 0, sizeof(kev));
            kev.ident = ev->ev_fd;
            kev.filter = EVFILT_WRITE;
            kev.flags = EV_ADD;
            if (!(ev->ev_events & EV_PERSIST))
                kev.flags |= EV_ONESHOT;
            kev.udata = ev;
            
            if (kq_insert(kqop, &kev) == -1)
                return (-1);
    
            ev->ev_flags |= EVLIST_X_KQINKERNEL;
        }
    
        return (0);
    }
    

    kq_insert

    int
    kq_insert(struct kqop *kqop, struct kevent *kev)
    {
        int nevents = kqop->nevents;
    
        if (kqop->nchanges == nevents) {
            struct kevent *newchange;
            struct kevent *newresult;
            //有事件过来,翻倍增加。
            nevents *= 2;
    
            newchange = realloc(kqop->changes,
                        nevents * sizeof(struct kevent));
            if (newchange == NULL) {
                log_error(__FUNCTION__": malloc");
                return (-1);
            }
            kqop->changes = newchange;
            //申请两个kevent。kqueue。一个kevent队列用来注册。一个kevent用来报告那些事件发生了。
            /* 觉得还是奇怪。不担心申请到同一片吗?*/
            newresult = realloc(kqop->changes,
                        nevents * sizeof(struct kevent));
    
            /*
             * If we fail, we don't have to worry about freeing,
             * the next realloc will pick it up.
             */
            if (newresult == NULL) {
                log_error(__FUNCTION__": malloc");
                return (-1);
            }
            kqop->events = newchange;
    
            kqop->nevents = nevents;
        }
        /* 压数据到最后*/
        memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
    
        LOG_DBG((LOG_MISC, 70, __FUNCTION__": fd %d %s%s",
             kev->ident, 
             kev->filter == EVFILT_READ ? "EVFILT_READ" : "EVFILT_WRITE",
             kev->flags == EV_DELETE ? " (del)" : ""));
    
        return (0);
    }
    

    kq_del

    int
    kq_del(void *arg, struct event *ev)
    {
        struct kqop *kqop = arg;
        struct kevent kev;
        //检查是不是kq的事件
        if (!(ev->ev_flags & EVLIST_X_KQINKERNEL))
            return (0);
    
        if (ev->ev_events & EV_SIGNAL) {
            int nsignal = EVENT_SIGNAL(ev);
    
            memset(&kev, 0, sizeof(kev));
            kev.ident = (int)signal;
            kev.filter = EVFILT_SIGNAL;
            kev.flags = EV_DELETE;//删除事件。
            //删除事件,也需要传入kqueue,等待内核删除。
            if (kq_insert(kqop, &kev) == -1)
                return (-1);
            //删除信号事件
            if (signal(nsignal, SIG_DFL) == SIG_ERR)
                return (-1);
            //将信号位置0.
            ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
            return (0);
        }
    
        if (ev->ev_events & EV_READ) {
            memset(&kev, 0, sizeof(kev));
            kev.ident = ev->ev_fd;
            kev.filter = EVFILT_READ;
            kev.flags = EV_DELETE;
            
            if (kq_insert(kqop, &kev) == -1)
                return (-1);
    
            ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
        }
    
        if (ev->ev_events & EV_WRITE) {
            memset(&kev, 0, sizeof(kev));
            kev.ident = ev->ev_fd;
            kev.filter = EVFILT_WRITE;
            kev.flags = EV_DELETE;
            
            if (kq_insert(kqop, &kev) == -1)
                return (-1);
    
            ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
        }
    
        return (0);
    }
    
    

    kq_dispatch

    最重要的事件处理函数。

    int
    kq_dispatch(void *arg, struct timeval *tv)
    {
        struct kqop *kqop = arg;
        struct kevent *changes = kqop->changes;
        struct kevent *events = kqop->events;
        struct event *ev;
        struct timespec ts;
        int i, res;
    
        TIMEVAL_TO_TIMESPEC(tv, &ts);//将毫秒级的时间,转为纳秒级
    
        res = kevent(kqop->kq, changes, kqop->nchanges,
                 events, kqop->nevents, &ts);
        kqop->nchanges = 0;//这边把事件数目置0了。 todo
        if (res == -1) {
            if (errno != EINTR) {
                log_error("kevent");
                return (-1);
            }
    
            return (0);
        }
    
        LOG_DBG((LOG_MISC, 80, __FUNCTION__": kevent reports %d", res));
    
        //循环处理每个事件。根据flags来判断是发生了什么事件。
        for (i = 0; i < res; i++) {
            int which = 0;
    
            if (events[i].flags & EV_ERROR) {
                /* 
                 * Error messages that can happen, when a delete fails.
                 *   EBADF happens when the file discriptor has been
                 *   closed,
                 *   ENOENT when the file discriptor was closed and
                 *   then reopened.
                 * An error is also indicated when a callback deletes
                 * an event we are still processing.  In that case
                 * the data field is set to ENOENT.
                 */
                if (events[i].data == EBADF ||
                    events[i].data == ENOENT)
                    continue;
                return (-1);
            }
            //前面add的时候,这边传的是ev
            ev = events[i].udata;
            //如果是EV_READ  EV_WRITE   EV_SIGNAL事件,就会跳过。
            if (events[i].filter == EVFILT_READ) {
                which |= EV_READ;
            } else if (events[i].filter == EVFILT_WRITE) {
                which |= EV_WRITE;
            } else if (events[i].filter == EVFILT_SIGNAL) {
                which |= EV_SIGNAL;
            } else
                events[i].filter = 0;
    
            if (!which)
                continue;
            /*只处理time事件和EV_PERSIST持久事件。
            将该事件添加到活动队列中。非信号事件只触发一次。
            并将该事件的ev_flags置为EVLIST_ACTIVE
            */ 
            event_active(ev, which,
                ev->ev_events & EV_SIGNAL ? events[i].data : 1);
        }
    
        for (i = 0; i < res; i++) {
            /* XXX */
            int ncalls, evres;
    
            if (events[i].flags & EV_ERROR || events[i].filter == NULL)
                continue;
    
            ev = events[i].udata;
            if (ev->ev_events & EV_PERSIST)
                continue;
    
            ncalls = 0;
            if (ev->ev_flags & EVLIST_ACTIVE) {
                ncalls = ev->ev_ncalls;
                evres = ev->ev_res;
            }
            ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
            //非持久事件就删掉了。
            event_del(ev);
            /* 
            激活事件
            */ 
            if (ncalls)
                event_active(ev, evres, ncalls);
        }
    
        return (0);
    }
    

    收获

    1. 看完了整体的代码。感觉还是很神奇的。由不懂到懂经历了好长的时间。
    2. 代码上面没有感觉到什么特别之处。

    相关文章

      网友评论

          本文标题:kqueue in libevent

          本文链接:https://www.haomeiwen.com/subject/xolbuhtx.html