[TOC]
kqueue 是 FreeBSD 上的一种的多路复用机制。它是针对传统的 select/poll 处理大量的文件描述符性能较低效而开发出来的。注册一批描述符到 kqueue 以后,当其中的描述符状态发生变化时,kqueue 将一次性通知应用程序哪些描述符可读、可写或出错了。
kqueue知识点说明
kqueue体系只有三样东西:struct kevent结构体,EV_SET宏以及kevent函数。即 kqueue()、kevent() 两个系统调用和 struct kevent 结构。
kevent
struct kevent {
uintptr_t ident; /* 事件 ID */
short filter; /* 事件过滤器 */
/*
添加到kqueue中:EV_ADD, 从kqueue中删除:EV_DELETE, 这两种是主要的行为
一次性事件:EV_ONESHOT, 此事件是或操作, 指定了该事件, kevent()返回后, 事件会从kqueue中删除
更新事件: EV_CLEAR,此事件是或操作, 手册上的解释是,当事件通知给用户后,事件的状态会被重置。可以用在类似 于epoll的ET模式,也可以用在描述符有时会出错的情况。
其他事件: EOF事件:EV_EOF, 错误事件:EV_ERROR(返回值)
*/
u_short flags; /* 行为标识 */
u_int fflags; /* 过滤器标识值 */
intptr_t data; /* 过滤器数据 */
void *udata; /* 应用透传数据 */
};
在一个 kqueue 中,{ident, filter} 确定一个唯一的事件。
kevent函数
/* kevent 提供向内核注册反注册事件和返回就绪事件或错误事件:
kq: kqueue 的文件描述符。
changelist: 要注册 / 反注册的事件数组;
nchanges: changelist 的元素个数。
eventlist: 满足条件的通知事件数组;
nevents: eventlist 的元素个数。
timeout: 等待事件到来时的超时时间,0,立刻返回;NULL,一直等待;有一个具体值,等待 timespec 时间值。 返回值:可用事件的个数。*/
int kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
kqueue的参考文档
kqueue.c
struct kqop {
struct kevent *changes;//有事件发生的队列
int nchanges;//发生事情的个数。这样可以遍历事件队列
struct kevent *events;//监控队列
int nevents;//监控队列长度
int kq;// kq队列
} kqop;
所有的方法
void *kq_init (void);
int kq_add (void *, struct event *);
int kq_del (void *, struct event *);
int kq_recalc (void *, int);
int kq_dispatch (void *, struct timeval *);
将该类注册为eventop的一个实例。
struct eventop kqops = {
"kqueue",
kq_init,
kq_add,
kq_del,
kq_recalc,
kq_dispatch
};
kq_init
- 先检查环节变量,是否设置为kqueue可用。
- 初始化一个kqueue的队列。
- 初始化kevent的事件队列。
void *
kq_init(void)
{
int kq;
/* Disable kqueue when this environment variable is set */
if (getenv("EVENT_NOKQUEUE"))
return (NULL);
memset(&kqop, 0, sizeof(kqop));
/* Initalize the kernel queue */
if ((kq = kqueue()) == -1) {
log_error("kqueue");
return (NULL);
}
kqop.kq = kq;
/* Initalize fields */
kqop.changes = malloc(NEVENT * sizeof(struct kevent));
if (kqop.changes == NULL)
return (NULL);
kqop.events = malloc(NEVENT * sizeof(struct kevent));
if (kqop.events == NULL) {
free (kqop.changes);
return (NULL);
}
kqop.nevents = NEVENT;
return (&kqop);
}
kq_recal
这个类,不像select。发生一次事件后,需要重新注册事件。所以直接返回0。
int
kq_recalc(void *arg, int max)
{
return (0);
}
kq_add
int
kq_add(void *arg, struct event *ev)
{
struct kqop *kqop = arg;//获取this指针。
struct kevent kev;//待添加事件
//检查是否为信号事件。
if (ev->ev_events & EV_SIGNAL) {
int nsignal = EVENT_SIGNAL(ev);//获取句柄
memset(&kev, 0, sizeof(kev));
kev.ident = nsignal;//identifier for this event
kev.filter = EVFILT_SIGNAL;// 有很多,EVFILT_READ,EVFILT_WRITE,EVFILT_TIMER等
kev.flags = EV_ADD;//指定事件操作类型,比如EV_ADD,EV_ENABLE, EV_DELETE等
if (!(ev->ev_events & EV_PERSIST))
kev.flags |= EV_ONESHOT;
kev.udata = ev;//将ev当做参数了。
if (kq_insert(kqop, &kev) == -1)
return (-1);
//注册信号行为。该信号函数是个空事件。可能留着以后用吧。
if (signal(nsignal, kq_sighandler) == SIG_ERR)
return (-1);
ev->ev_flags |= EVLIST_X_KQINKERNEL;//给libevent的event的ev_flags添加
return (0);
}
if (ev->ev_events & EV_READ) {
memset(&kev, 0, sizeof(kev));
kev.ident = ev->ev_fd;
kev.filter = EVFILT_READ;
kev.flags = EV_ADD;
if (!(ev->ev_events & EV_PERSIST))
kev.flags |= EV_ONESHOT;
kev.udata = ev;
if (kq_insert(kqop, &kev) == -1)
return (-1);
ev->ev_flags |= EVLIST_X_KQINKERNEL;
}
if (ev->ev_events & EV_WRITE) {
memset(&kev, 0, sizeof(kev));
kev.ident = ev->ev_fd;
kev.filter = EVFILT_WRITE;
kev.flags = EV_ADD;
if (!(ev->ev_events & EV_PERSIST))
kev.flags |= EV_ONESHOT;
kev.udata = ev;
if (kq_insert(kqop, &kev) == -1)
return (-1);
ev->ev_flags |= EVLIST_X_KQINKERNEL;
}
return (0);
}
kq_insert
int
kq_insert(struct kqop *kqop, struct kevent *kev)
{
int nevents = kqop->nevents;
if (kqop->nchanges == nevents) {
struct kevent *newchange;
struct kevent *newresult;
//有事件过来,翻倍增加。
nevents *= 2;
newchange = realloc(kqop->changes,
nevents * sizeof(struct kevent));
if (newchange == NULL) {
log_error(__FUNCTION__": malloc");
return (-1);
}
kqop->changes = newchange;
//申请两个kevent。kqueue。一个kevent队列用来注册。一个kevent用来报告那些事件发生了。
/* 觉得还是奇怪。不担心申请到同一片吗?*/
newresult = realloc(kqop->changes,
nevents * sizeof(struct kevent));
/*
* If we fail, we don't have to worry about freeing,
* the next realloc will pick it up.
*/
if (newresult == NULL) {
log_error(__FUNCTION__": malloc");
return (-1);
}
kqop->events = newchange;
kqop->nevents = nevents;
}
/* 压数据到最后*/
memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
LOG_DBG((LOG_MISC, 70, __FUNCTION__": fd %d %s%s",
kev->ident,
kev->filter == EVFILT_READ ? "EVFILT_READ" : "EVFILT_WRITE",
kev->flags == EV_DELETE ? " (del)" : ""));
return (0);
}
kq_del
int
kq_del(void *arg, struct event *ev)
{
struct kqop *kqop = arg;
struct kevent kev;
//检查是不是kq的事件
if (!(ev->ev_flags & EVLIST_X_KQINKERNEL))
return (0);
if (ev->ev_events & EV_SIGNAL) {
int nsignal = EVENT_SIGNAL(ev);
memset(&kev, 0, sizeof(kev));
kev.ident = (int)signal;
kev.filter = EVFILT_SIGNAL;
kev.flags = EV_DELETE;//删除事件。
//删除事件,也需要传入kqueue,等待内核删除。
if (kq_insert(kqop, &kev) == -1)
return (-1);
//删除信号事件
if (signal(nsignal, SIG_DFL) == SIG_ERR)
return (-1);
//将信号位置0.
ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
return (0);
}
if (ev->ev_events & EV_READ) {
memset(&kev, 0, sizeof(kev));
kev.ident = ev->ev_fd;
kev.filter = EVFILT_READ;
kev.flags = EV_DELETE;
if (kq_insert(kqop, &kev) == -1)
return (-1);
ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
}
if (ev->ev_events & EV_WRITE) {
memset(&kev, 0, sizeof(kev));
kev.ident = ev->ev_fd;
kev.filter = EVFILT_WRITE;
kev.flags = EV_DELETE;
if (kq_insert(kqop, &kev) == -1)
return (-1);
ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
}
return (0);
}
kq_dispatch
最重要的事件处理函数。
int
kq_dispatch(void *arg, struct timeval *tv)
{
struct kqop *kqop = arg;
struct kevent *changes = kqop->changes;
struct kevent *events = kqop->events;
struct event *ev;
struct timespec ts;
int i, res;
TIMEVAL_TO_TIMESPEC(tv, &ts);//将毫秒级的时间,转为纳秒级
res = kevent(kqop->kq, changes, kqop->nchanges,
events, kqop->nevents, &ts);
kqop->nchanges = 0;//这边把事件数目置0了。 todo
if (res == -1) {
if (errno != EINTR) {
log_error("kevent");
return (-1);
}
return (0);
}
LOG_DBG((LOG_MISC, 80, __FUNCTION__": kevent reports %d", res));
//循环处理每个事件。根据flags来判断是发生了什么事件。
for (i = 0; i < res; i++) {
int which = 0;
if (events[i].flags & EV_ERROR) {
/*
* Error messages that can happen, when a delete fails.
* EBADF happens when the file discriptor has been
* closed,
* ENOENT when the file discriptor was closed and
* then reopened.
* An error is also indicated when a callback deletes
* an event we are still processing. In that case
* the data field is set to ENOENT.
*/
if (events[i].data == EBADF ||
events[i].data == ENOENT)
continue;
return (-1);
}
//前面add的时候,这边传的是ev
ev = events[i].udata;
//如果是EV_READ EV_WRITE EV_SIGNAL事件,就会跳过。
if (events[i].filter == EVFILT_READ) {
which |= EV_READ;
} else if (events[i].filter == EVFILT_WRITE) {
which |= EV_WRITE;
} else if (events[i].filter == EVFILT_SIGNAL) {
which |= EV_SIGNAL;
} else
events[i].filter = 0;
if (!which)
continue;
/*只处理time事件和EV_PERSIST持久事件。
将该事件添加到活动队列中。非信号事件只触发一次。
并将该事件的ev_flags置为EVLIST_ACTIVE
*/
event_active(ev, which,
ev->ev_events & EV_SIGNAL ? events[i].data : 1);
}
for (i = 0; i < res; i++) {
/* XXX */
int ncalls, evres;
if (events[i].flags & EV_ERROR || events[i].filter == NULL)
continue;
ev = events[i].udata;
if (ev->ev_events & EV_PERSIST)
continue;
ncalls = 0;
if (ev->ev_flags & EVLIST_ACTIVE) {
ncalls = ev->ev_ncalls;
evres = ev->ev_res;
}
ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
//非持久事件就删掉了。
event_del(ev);
/*
激活事件
*/
if (ncalls)
event_active(ev, evres, ncalls);
}
return (0);
}
收获
- 看完了整体的代码。感觉还是很神奇的。由不懂到懂经历了好长的时间。
- 代码上面没有感觉到什么特别之处。
网友评论