美文网首页
linux系统下poll和epoll内核源代码剖析

linux系统下poll和epoll内核源代码剖析

作者: linux大本营 | 来源:发表于2020-08-24 17:06 被阅读0次

    poll和epoll的使用应该不用再多说了。当fd很多时,使用epoll比poll效率更高。我们通过内核源码分析来看看到底是为什么。

    poll剖析poll系统调用:

    intpoll(struct pollfd *fds,nfds_tnfds,inttimeout);

    对应的实现代码为:

    [fs/select.c -->sys_poll]

    asmlinkagelongsys_poll(struct pollfd __user * ufds,unsignedintnfds,longtimeout)

    {

    structpoll_wqueuestable;

    intfdcount, err;

    unsignedinti;

    structpoll_list*head;

    structpoll_list*walk;

    /* Do a sanity check on nfds ... *//* 用户给的nfds数不可以超过一个struct file结构支持

    的最大fd数(默认是256)*/

    if(nfds > current->files->max_fdset && nfds > OPEN_MAX)

    return-EINVAL;

    if(timeout) {

    /* Careful about overflow in the intermediate values */

    if((unsignedlong) timeout < MAX_SCHEDULE_TIMEOUT / HZ)

    timeout = (unsignedlong)(timeout*HZ+999)/1000+1;

    else/* Negative or overflow */

    timeout = MAX_SCHEDULE_TIMEOUT;

    }

    poll_initwait(&table);

    其中poll_initwait较为关键,从字面上看,应该是初始化变量table,注意此处table在整个执行poll的过程中是很关键的变量。而struct poll_table其实就只包含了一个函数指针:

    [fs/poll.h]

    /*

    * structures and helpers for f_op->poll implementations

    */

    typedefvoid(*poll_queue_proc)(struct file *,wait_queue_head_t*, struct

    poll_table_struct *);

    typedefstructpoll_table_struct{

    poll_queue_proc qproc;

    }

    poll_table;

    现在我们来看看poll_initwait到底在做些什么

    [fs/select.c]

    void __pollwait(structfile*filp, wait_queue_head_t *wait_address, poll_table *p);

    void poll_initwait(structpoll_wqueues*pwq)

    {

    &(pwq->pt)->qproc = __pollwait;/*此行已经被我“翻译”了,方便观看*/

    pwq->error =0;

    pwq->table = NULL;

    }

    需要C/C++ Linux服务器架构师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

    很明显,poll_initwait的主要动作就是把table变量的成员poll_table对应的回调函数置__pollwait。这个__pollwait不仅是poll系统调用需要,select系统调用也一样是用这个__pollwait,说白了,这是个操作系统的异步操作的“御用”回调函数。当然了,epoll没有用这个,它另外新增了一个回调函数,以达到其高效运转的目的,这是后话,暂且不表。我们先不讨论__pollwait的具体实现,还是继续看sys_poll:

    [fs/select.c -->sys_poll]

    head = NULL;

    walk = NULL;

    i = nfds;

    err = -ENOMEM;

    while(i!=0) {

    structpoll_list*pp;

    pp = kmalloc(sizeof(structpoll_list)+

    sizeof(structpollfd)*

    (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),

    GFP_KERNEL);

    if(pp==NULL)

    goto out_fds;

    pp->next=NULL;

    pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);

    if(head == NULL)

    head = pp;

    else

    walk->next = pp;

    walk = pp;

    if(copy_from_user(pp->entries, ufds + nfds-i,

    sizeof(structpollfd)*pp->len)) {

    err = -EFAULT;

    goto out_fds;

    }

    i -= pp->len;

    }

    fdcount = do_poll(nfds, head, &table, timeout);

    这一大堆代码就是建立一个链表,每个链表的节点是一个page大小(通常是4k),这链表节点由一个指向struct poll_list的指针掌控,而众多的struct pollfd就通过struct_list的entries成员访问。上面的循环就是把用户态的struct pollfd拷进这些entries里。通常用户程序的poll调用就监控几个fd,所以上面这个链表通常也就只需要一个节点,即操作系统的一页。但是,当用户传入的fd很多时,由于poll系统调用每次都要把所有struct pollfd拷进内核,所以参数传递和页分配此时就成了poll系统调用的性能瓶颈。最后一句do_poll,我们跟进去:

    [fs/select.c-->sys_poll()-->do_poll()]

    staticvoiddo_pollfd(unsignedintnum, struct pollfd * fdpage,

    poll_table ** pwait,int*count)

    {

    inti;

    for(i =0; i < num; i++) {

    intfd;

    unsignedintmask;

    structpollfd*fdp;

    mask =0;

    fdp = fdpage+i;

    fd = fdp->fd;

    if(fd >=0) {

    structfile*file=fget(fd);

    mask = POLLNVAL;

    if(file !=NULL) {

    mask = DEFAULT_POLLMASK;

    if(file->f_op && file->f_op->poll)

    mask = file->f_op->poll(file, *pwait);

    mask &= fdp->events | POLLERR | POLLHUP;

    fput(file);

    }

    if(mask) {

    *pwait =NULL;

    (*count)++;

    }

    }

    fdp->revents = mask;

    }

    }

    staticintdo_poll(unsignedintnfds, struct poll_list *list,

    struct poll_wqueues *wait,longtimeout)

    {

    intcount =0;

    poll_table* pt = &wait->pt;

    if(!timeout)

    pt =NULL;

    for(;;) {

    structpoll_list*walk;

    set_current_state(TASK_INTERRUPTIBLE);

    walk =list;

    while(walk !=NULL) {

    do_pollfd( walk->len, walk->entries, &pt, &count);

    walk = walk->next;

    }

    pt =NULL;

    if(count || !timeout || signal_pending(current))

    break;

    count = wait->error;

    if(count)

    break;

    timeout = schedule_timeout(timeout);/* 让current挂起,别的进程跑,timeout到了

    以后再回来运行current*/

    }

    __set_current_state(TASK_RUNNING);

    returncount;

    }

    注意set_current_state和signal_pending,它们两句保障了当用户程序在调用poll后挂起时,发信号可以让程序迅速推出poll调用,而通常的系统调用是不会被信号打断的。

    纵览do_poll函数,主要是在循环内等待,直到count大于0才跳出循环,而count主要是靠do_pollfd函数处理。注意这段代码:

    while(walk !=NULL) {

    do_pollfd( walk->len, walk->entries, &pt, &count);

    walk = walk->next;

    }

    当用户传入的fd很多时(比如1000个),对do_pollfd就会调用很多次,poll效率瓶颈的另一原因就在这里。do_pollfd就是针对每个传进来的fd,调用它们各自对应的poll函数,简化一下调用过程,如下:

    structfile* file = fget(fd);

    file->f_op->poll(file, &(table->pt));

    如果fd对应的是某个socket,do_pollfd调用的就是网络设备驱动实现的poll;如果fd对应的是某个ext3文件系统上的一个打开文件,那do_pollfd调用的就是ext3文件系统驱动实现的poll。一句话,这个file->f_op->poll是设备驱动程序实现的,那设备驱动程序的poll实现通常又是什么样子呢?其实,设备驱动程序的标准实现是:调用poll_wait,即以设备自己的等待队列为参数(通常设备都有自己的等待队列,不然一个不支持异步操作的设备会让人很郁闷)调用struct poll_table的回调函数。作为驱动程序的代表,我们看看socket在使用tcp时的代码:

    [net/ipv4/tcp.c-->tcp_poll]

    unsigned int tcp_poll(structfile*file,structsocket*sock, poll_table *wait)

    {

    unsigned int mask;

    structsock*sk = sock->sk;

    structtcp_opt*tp = tcp_sk(sk);

    poll_wait(file, sk->sk_sleep, wait);

    代码就看这些,剩下的无非就是判断状态、返回状态值,tcp_poll的核心实现就是poll_wait,而

    poll_wait就是调用struct poll_table对应的回调函数,那poll系统调用对应的回调函数就是__poll_wait,所以这里几乎就可以把tcp_poll理解为一个语句:

    __poll_wait(file, sk->sk_sleep,wait);

    由此也可以看出,每个socket自己都带有一个等待队列sk_sleep,所以上面我们所说的“设备的等待队列”其实不止一个。这时候我们再看看__poll_wait的实现:

    [fs/select.c-->__poll_wait()]

    void __pollwait(structfile*filp, wait_queue_head_t *wait_address, poll_table *_p)

    {

    structpoll_wqueues*p = container_of(_p,structpoll_wqueues, pt);

    structpoll_table_page*table = p->table;

    if(!table || POLL_TABLE_FULL(table)) {

    structpoll_table_page*new_table;

    new_table = (structpoll_table_page*) __get_free_page(GFP_KERNEL);

    if(!new_table) {

    p->error = -ENOMEM;

    __set_current_state(TASK_RUNNING);

    return;

    }

    new_table->entry = new_table->entries;

    new_table->next = table;

    p->table = new_table;

    table = new_table;

    }

    /* Add a new entry */

    {

    structpoll_table_entry* entry = table->entry;

    table->entry = entry+1;

    get_file(filp);

    entry->filp = filp;

    entry->wait_address = wait_address;

    init_waitqueue_entry(&entry->wait, current);

    add_wait_queue(wait_address,&entry->wait);

    }

    }

    __poll_wait的作用就是创建了上图所示的数据结构(一次__poll_wait即一次设备poll调用只创建一个poll_table_entry),并通过struct poll_table_entry的wait成员,把current挂在了设备的等待队列

    上,此处的等待队列是wait_address,对应tcp_poll里的sk->sk_sleep。现在我们可以回顾一下poll系统调用的原理了:先注册回调函数__poll_wait,再初始化table变量(类型为struct poll_wqueues),接着拷贝用户传入的struct pollfd(其实主要是fd),然后轮流调用所有fd对应的poll(把current挂到各个fd对应的设备等待队列上)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上的进程,这时current便被唤醒了。current醒来后离开sys_poll的操作相对简单,这里就不逐行分析了。

    epoll

    通过上面的分析,poll运行效率的两个瓶颈已经找出,现在的问题是怎么改进。首先,每次poll都要把1000个fd 拷入内核,太不科学了,内核干嘛不自己保存已经拷入的fd呢?答对了,epoll就是自己保存拷入的fd,它的API就已经说明了这一点——不是 epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd传入内核再一起"wait",这就省掉了不必要的重复拷贝。其次,在 epoll_wait时,也不是把current轮流的加入fd对应的设备等待队列,而是在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。

    epoll剖析

    epoll是个module,所以先看看module的入口eventpoll_init

    [fs/eventpoll.c-->evetpoll_init()]

    staticint__init eventpoll_init(void)

    {

    interror;

    init_MUTEX(&epsem);

    /* Initialize the structure used to perform safe poll wait head wake ups */

    ep_poll_safewake_init(&psw);

    /* Allocates slab cache used to allocate "struct epitem" items */

    epi_cache = kmem_cache_create("eventpoll_epi",sizeof(structepitem),

    0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,

    NULL,NULL);

    /* Allocates slab cache used to allocate "struct eppoll_entry" */

    pwq_cache = kmem_cache_create("eventpoll_pwq",

    sizeof(structeppoll_entry),0,

    EPI_SLAB_DEBUG|SLAB_PANIC,NULL,NULL);

    /*

    * Register the virtual file system that will be the source of inodes

    * for the eventpoll files

    */

    error = register_filesystem(&eventpoll_fs_type);

    if(error)

    gotoepanic;

    /* Mount the above commented virtual file system */

    eventpoll_mnt = kern_mount(&eventpoll_fs_type);

    error = PTR_ERR(eventpoll_mnt);

    if(IS_ERR(eventpoll_mnt))

    gotoepanic;

    DNPRINTK(3, (KERN_INFO"[%p] eventpoll: successfully initialized.\n",

    current));

    return0;

    epanic:

    panic("eventpoll_init() failed\n");

    }

    很有趣,这个module在初始化时注册了一个新的文件系统,叫"eventpollfs"(在eventpoll_fs_type结构里),然后挂载此文件系统。另外创建两个内核cache(在内核编程中,如果需要频繁分配小块内存,应该创建kmem_cahe来做“内存池”),分别用于存放struct epitem和eppoll_entry。如果以后要开发新的文件系统,可以参考这段代码。现在想想epoll_create为什么会返回一个新的fd?因为它就是在这个叫做"eventpollfs"的文件系统里创建了一个新文件!如下:

    [fs/eventpoll.c-->sys_epoll_create()]

    asmlinkagelongsys_epoll_create(intsize)

    {

    interror, fd;

    structinode*inode;

    structfile*file;

    DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_create(%d)\n",

    current, size));

    /* Sanity check on the size parameter */

    error = -EINVAL;

    if(size <=0)

    gotoeexit_1;

    /*

    * Creates all the items needed to setup an eventpoll file. That is,

    * a file structure, and inode and a free file descriptor.

    */

    error = ep_getfd(&fd, &inode, &file);

    if(error)

    gotoeexit_1;

    /* Setup the file internal data structure ( "struct eventpoll" ) */

    error = ep_file_init(file);

    if(error)

    gotoeexit_2;

    函数很简单,其中ep_getfd看上去是“get”,其实在第一次调用epoll_create时,它是要创建新inode、新的file、新的fd。而ep_file_init则要创建一个struct eventpoll结构,并把它放入file-

    >private_data,注意,这个private_data后面还要用到的。看到这里,也许有人要问了,为什么epoll的开发者不做一个内核的超级大map把用户要创建的epoll句柄存起来,在epoll_create时返回一个指针?那似乎很直观呀。但是,仔细看看,linux的系统调用有多少是返回指针的?你会发现几乎没有!(特此强调,malloc不是系统调用,malloc调用的brk才是)因为linux做为unix的最杰出的继承人,它遵循了unix的一个巨大优点——一切皆文件,输入输出是文件、socket也

    是文件,一切皆文件意味着使用这个操作系统的程序可以非常简单,因为一切都是文件操作而已!(unix还不是完全做到,plan 9才算)。而且使用文件系统有个好处:epoll_create返回的是一个fd,而不是该死的指针,指针如果指错了,你简直没办法判断,而fd则可以通过current->files->fd_array[]找到其真伪。epoll_create好了,该epoll_ctl了,我们略去判断性的代码:

    [fs/eventpoll.c-->sys_epoll_ctl()]

    asmlinkagelong

    sys_epoll_ctl(intepfd,intop,intfd, struct epoll_event __user *event)

    {

    interror;

    structfile*file, *tfile;

    structeventpoll*ep;

    structepitem*epi;

    structepoll_eventepds;

    ....

    epi = ep_find(ep, tfile, fd);

    error = -EINVAL;

    switch(op) {

    caseEPOLL_CTL_ADD:

    if(!epi) {

    epds.events |= POLLERR | POLLHUP;

    error = ep_insert(ep, &epds, tfile, fd);

    }else

    error = -EEXIST;

    break;

    caseEPOLL_CTL_DEL:

    if(epi)

    error = ep_remove(ep, epi);

    else

    error = -ENOENT;

    break;

    caseEPOLL_CTL_MOD:

    if(epi) {

    epds.events |= POLLERR | POLLHUP;

    error = ep_modify(ep, epi, &epds);

    } else

    error = -ENOENT;

    break;

    }

    原来就是在一个大的结构(现在先不管是什么大结构)里先ep_find,如果找到了struct epitem而用户操作是ADD,那么返回-EEXIST;如果是DEL,则ep_remove。如果找不到struct epitem而用户操作是ADD,就ep_insert创建并插入一个。很直白。那这个“大结构”是什么呢?看ep_find的调用方式,ep参数应该是指向这个“大结构”的指针,再看ep = file->private_data,我们才明白,原来这个“大结构”就是那个在epoll_create时创建的struct eventpoll,具体再看看ep_find的实现,发现原来是struct eventpoll的rbr成员(struct rb_root),原来这是一个红黑树的根!而红黑树上挂的都是struct epitem。现在清楚了,一个新创建的epoll文件带有一个struct eventpoll结构,这个结构上再挂一个红黑树,而这个红黑树就是每次epoll_ctl时fd存放的地方!现在数据结构都已经清楚了,我们来看最核心的:

    [fs/eventpoll.c-->sys_epoll_wait()]

    asmlinkagelongsys_epoll_wait(intepfd, struct epoll_event __user *events,

    intmaxevents,inttimeout)

    {

    interror;

    structfile*file;

    structeventpoll*ep;

    DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d)\n",

    current, epfd, events, maxevents, timeout));

    /* The maximum number of event must be greater than zero */

    if(maxevents <=0)

    return-EINVAL;

    /* Verify that the area passed by the user is writeable */

    if((error = verify_area(VERIFY_WRITE, events, maxevents *sizeof(struct

    epoll_event))))

    gotoeexit_1;

    /* Get the "struct file *" for the eventpoll file */

    error = -EBADF;

    file = fget(epfd);

    if(!file)

    gotoeexit_1;

    /*

    * We have to check that the file structure underneath the fd

    * the user passed to us _is_ an eventpoll file.

    */

    error = -EINVAL;

    if(!IS_FILE_EPOLL(file))

    gotoeexit_2;

    /*

    * At this point it is safe to assume that the "private_data" contains

    * our own data structure.

    */

    ep = file->private_data;

    /* Time to fish for events ... */

    error = ep_poll(ep, events, maxevents, timeout);

    eexit_2:

    fput(file);

    eexit_1:

    DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) =

    %d\n",

    current, epfd, events, maxevents, timeout, error));

    returnerror;

    }

    故伎重演,从file->private_data中拿到struct eventpoll,再调用ep_poll

    [fs/eventpoll.c-->sys_epoll_wait()->ep_poll()]

    staticintep_poll(structeventpoll *ep,structepoll_event __user *events,

    intmaxevents,longtimeout)

    {

    intres, eavail;

    unsignedlongflags;

    longjtimeout;

    wait_queue_t wait;

    /*

    * Calculate the timeout by checking for the "infinite" value ( -1 )

    * and the overflow condition. The passed timeout is in milliseconds,

    * that why (t * HZ) / 1000.

    */

    jtimeout = timeout ==-1|| timeout > (MAX_SCHEDULE_TIMEOUT -1000) / HZ ?

    MAX_SCHEDULE_TIMEOUT: (timeout * HZ +999) /1000;

    retry:

    write_lock_irqsave(&ep->lock, flags);

    res =0;

    if(list_empty(&ep->rdllist)) {

    /*

    * We don't have any available event to return to the caller.

    * We need to sleep here, and we will be wake up by

     * ep_poll_callback() when events will become available.

    */

    init_waitqueue_entry(&wait, current);

    add_wait_queue(&ep->wq, &wait);

    for(;;) {

    /*

    * We don't want to sleep if the ep_poll_callback() sends us

    * a wakeup in between. That's why we set the task state

    * to TASK_INTERRUPTIBLE before doing the checks.

    */

    set_current_state(TASK_INTERRUPTIBLE);

    if(!list_empty(&ep->rdllist) || !jtimeout)

    break;

    if(signal_pending(current)) {

    res = -EINTR;

    break;

    }

    write_unlock_irqrestore(&ep->lock, flags);

    jtimeout = schedule_timeout(jtimeout);

    write_lock_irqsave(&ep->lock, flags);

    }

    remove_wait_queue(&ep->wq, &wait);

    set_current_state(TASK_RUNNING);

    }

    又是一个大循环,不过这个大循环比poll的那个好,因为仔细一看——它居然除了睡觉和判断ep->rdllist是否为空以外,啥也没做!什么也没做当然效率高了,但到底是谁来让ep->rdllist不为空呢?答案是ep_insert时设下的回调函数

    [fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]

    staticint ep_insert(structeventpoll*ep,structepoll_event*event,

    structfile*tfile, int fd)

    {

    int error, revents, pwake =0;

    unsigned long flags;

    structepitem*epi;

    structep_pqueueepq;

    error = -ENOMEM;

    if(!(epi = EPI_MEM_ALLOC()))

    goto eexit_1;

    /* Item initialization follow here ... */

    EP_RB_INITNODE(&epi->rbn);

    INIT_LIST_HEAD(&epi->rdllink);

    INIT_LIST_HEAD(&epi->fllink);

    INIT_LIST_HEAD(&epi->txlink);

    INIT_LIST_HEAD(&epi->pwqlist);

    epi->ep = ep;

    EP_SET_FFD(&epi->ffd, tfile, fd);

    epi->event = *event;

    atomic_set(&epi->usecnt,1);

    epi->nwait =0;

    /* Initialize the poll table using the queue callback */

    epq.epi = epi;

    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    /*

    * Attach the item to the poll hooks and get current event bits.

    * We can safely use the file* here because its usage count has

    * been increased by the caller of this function.

    */

    revents = tfile->f_op->poll(tfile, &epq.pt);

    我们注意init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);这一行,其实就是&(epq.pt)->qproc = ep_ptable_queue_proc;紧接着 tfile->f_op->poll(tfile, &epq.pt)其实就是调用被监控文件(epoll里叫“target file”)的poll方法,而这个poll其实就是调用poll_wait(还记得poll_wait吗?每个支持poll的设备驱动程序都要调用的),最后就是调用ep_ptable_queue_proc。这是比较难解的一个调用关系,因为不是语言级的直接调用。ep_insert还把struct epitem放到struct file里的f_ep_links连表里,以方便查找,struct epitem里的fllink就是担负这个使命的。

    [fs/eventpoll.c-->ep_ptable_queue_proc()]

    staticvoid ep_ptable_queue_proc(structfile*file, wait_queue_head_t *whead,

    poll_table *pt)

    {

    structepitem*epi = EP_ITEM_FROM_EPQUEUE(pt);

    structeppoll_entry*pwq;

    if(epi->nwait >=0&& (pwq = PWQ_MEM_ALLOC())) {

    init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

    pwq->whead = whead;

    pwq->base = epi;

    add_wait_queue(whead, &pwq->wait);

    list_add_tail(&pwq->llink, &epi->pwqlist);

    epi->nwait++;

    }else{

    /* We have to signal that an error occurred */

    epi->nwait = -1;

    }

    }

    上面的代码就是ep_insert中要做的最重要的事:创建struct eppoll_entry,设置其唤醒回调函数为

    ep_poll_callback,然后加入设备等待队列(注意这里的whead就是上一章所说的每个设备驱动都要带的等待队列)。只有这样,当设备就绪,唤醒等待队列上的等待着时,ep_poll_callback就会被调用。每次调用poll系统调用,操作系统都要把current(当前进程)挂到fd对应的所有设备的等待队列上,可以想象,fd多到上千的时候,这样“挂”法很费事;而每次调用epoll_wait则没有这么罗嗦,epoll只在epoll_ctl时把current挂一遍(这第一遍是免不了的)并给每个fd一个命令“好了就调回调函数”,如果设备有事件了,通过回调函数,会把fd放入rdllist,而每次调用epoll_wait就只是收集rdllist里的fd就可以了——epoll巧妙的利用回调函数,实现了更高效的事件驱动模型。现在我们猜也能猜出来ep_poll_callback会干什么了——肯定是把红黑树上的收到event的epitem(代表每个fd)插入ep->rdllist中,这样,当epoll_wait返回时,rdllist里就都是就绪的fd了!

    [fs/eventpoll.c-->ep_poll_callback()]

    staticint ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)

    {

    int pwake =0;

    unsigned long flags;

    structepitem*epi = EP_ITEM_FROM_WAIT(wait);

    structeventpoll*ep = epi->ep;

    DNPRINTK(3, (KERN_INFO"[%p] eventpoll: poll_callback(%p) epi=%p

    ep=%p\n",

    current, epi->file, epi, ep));

    write_lock_irqsave(&ep->lock, flags);

    /*

    * If the event mask does not contain any poll(2) event, we consider the

    * descriptor to be disabled. This condition is likely the effect of the

    * EPOLLONESHOT bit that disables the descriptor when an event is received,

    * until the next EPOLL_CTL_MOD will be issued.

    */

    if(!(epi->event.events & ~EP_PRIVATE_BITS))

    goto is_disabled;

    /* If this file is already in the ready list we exit soon */

    if(EP_IS_LINKED(&epi->rdllink))

    goto is_linked;

    list_add_tail(&epi->rdllink, &ep->rdllist);

    is_linked:

    /*

    * Wake up ( if active ) both the eventpoll wait list and the ->poll()

    * wait list.

    */

    if(waitqueue_active(&ep->wq))

    wake_up(&ep->wq);

    if(waitqueue_active(&ep->poll_wait))

    pwake++;

    is_disabled:

    write_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */

    if(pwake)

    ep_poll_safewake(&psw, &ep->poll_wait);

    return1;

    }

    真正重要的只有 list_add_tail(&epi->rdllink, &ep->rdllist);一句,就是把struct epitem放到struct eventpoll的rdllist中去。现在我们可以画出epoll的核心数据结构图了:

    epoll独有的EPOLLET

    EPOLLET是epoll系统调用独有的flag,ET就是Edge Trigger(边缘触发)的意思,具体含义和应用大家可google之。有了EPOLLET,重复的事件就不会总是出来打扰程序的判断,故而常被使用。那EPOLLET的原理是什么呢?epoll把fd都挂上一个回调函数,当fd对应的设备有消息时,就把fd放入rdllist链表,这样epoll_wait只要检查这个rdllist链表就可以知道哪些fd有事件了。我们看看ep_poll的最后几行代码:

    [fs/eventpoll.c->ep_poll()]

    /*

    * Try to transfer events to user space. In case we get 0 events and

    * there's still timeout left over, we go trying again in search of

    * more luck.

    */

    if(!res && eavail &&

    !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)

    gotoretry;

    returnres;

    }

    把rdllist里的fd拷到用户空间,这个任务是ep_events_transfer做的:

    [fs/eventpoll.c->ep_events_transfer()]

    staticint ep_events_transfer(structeventpoll*ep,

    structepoll_event__user *events, int maxevents)

    {

    int eventcnt =0;

    structlist_headtxlist;

    INIT_LIST_HEAD(&txlist);

    /*

    * We need to lock this because we could be hit by

    * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).

    */

    down_read(&ep->sem);

    /* Collect/extract ready items */

    if(ep_collect_ready_items(ep, &txlist, maxevents) >0) {

    /* Build result set in userspace */

    eventcnt = ep_send_events(ep, &txlist, events);

    /* Reinject ready items into the ready list */

    ep_reinject_items(ep, &txlist);

    }

    up_read(&ep->sem);

    returneventcnt;

    }

    代码很少,其中ep_collect_ready_items把rdllist里的fd挪到txlist里(挪完后rdllist就空了),接着

    ep_send_events把txlist里的fd拷给用户空间,然后ep_reinject_items把一部分fd从txlist里“返还”给

    rdllist以便下次还能从rdllist里发现它。其中ep_send_events的实现:

    [fs/eventpoll.c->ep_send_events()]

    staticint ep_send_events(structeventpoll*ep,structlist_head*txlist,

    structepoll_event__user *events)

    {

    int eventcnt =0;

    unsigned int revents;

    structlist_head*lnk;

    structepitem*epi;

    /*

    * We can loop without lock because this is a task private list.

    * The test done during the collection loop will guarantee us that

    * another task will not try to collect this file. Also, items

    * cannot vanish during the loop because we are holding "sem".

    */

    list_for_each(lnk, txlist) {

    epi = list_entry(lnk,structepitem, txlink);

    /*

    * Get the ready file event set. We can safely use the file

    * because we are holding the "sem" in read and this will

    * guarantee that both the file and the item will not vanish.

     */

    revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);

    /*

    * Set the return event set for the current file descriptor.

    * Note that only the task task was successfully able to link

    * the item to its "txlist" will write this field.

    */

    epi->revents = revents & epi->event.events;

    if(epi->revents) {

    if(__put_user(epi->revents,

    &events[eventcnt].events) ||

    __put_user(epi->event.data,

    &events[eventcnt].data))

    return-EFAULT;

    if(epi->event.events & EPOLLONESHOT)

    epi->event.events &= EP_PRIVATE_BITS;

    eventcnt++;

    }

    }

    returneventcnt;

    }

    这个拷贝实现其实没什么可看的,但是请注意revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);这一行,这个poll很狡猾,它把第二个参数置为NULL来调用。我们先看一下设备驱动通常是怎么实现poll的:

    staticunsigned int scull_p_poll(structfile*filp, poll_table *wait)

    {

    structscull_pipe*dev = filp->private_data;

    unsigned int mask =0;

    /*

    * The buffer is circular; it is considered full

    * if "wp" is right behind "rp" and empty if the

    * two are equal.

    */

    down(&dev->sem);

    poll_wait(filp, &dev->inq, wait);

    poll_wait(filp, &dev->outq, wait);

    if(dev->rp != dev->wp)

    mask |= POLLIN | POLLRDNORM;/* readable */

    if(spacefree(dev))

    mask |= POLLOUT | POLLWRNORM;/* writable */

    up(&dev->sem);

    returnmask;

    }

    上面这段代码摘自《linux设备驱动程序(第三版)》,绝对经典,设备先要把current(当前进程)挂在inq和outq两个队列上(这个“挂”操作是wait回调函数指针做的),然后等设备来唤醒,唤醒后就能通过mask拿到事件掩码了(注意那个mask参数,它就是负责拿事件掩码的)。那如果wait为NULL,poll_wait会做些什么呢?

    [include/linux/poll.h->poll_wait]

    staticinlinevoidpoll_wait(struct file * filp,wait_queue_head_t* wait_address,

    poll_table *p)

    {

    if(p && wait_address)

    p->qproc(filp, wait_address, p);

    }

    如果poll_table为空,什么也不做。我们倒回ep_send_events,那句标红的poll,实际上就是“我不想休眠,我只想拿到事件掩码”的意思。然后再把拿到的事件掩码拷给用户空间。ep_send_events完成后,就轮到ep_reinject_items了:

    [fs/eventpoll.c->ep_reinject_items]

    staticvoid ep_reinject_items(structeventpoll*ep,structlist_head*txlist)

    {

    int ricnt =0, pwake =0;

    unsigned long flags;

    structepitem*epi;

    write_lock_irqsave(&ep->lock, flags);

    while(!list_empty(txlist)) {

    epi = list_entry(txlist->next,structepitem, txlink);

    /* Unlink the current item from the transfer list */

    EP_LIST_DEL(&epi->txlink);

    /*

    * If the item is no more linked to the interest set, we don't

    * have to push it inside the ready list because the following

    * ep_release_epitem() is going to drop it. Also, if the current

    * item is set to have an Edge Triggered behaviour, we don't have

    * to push it back either.

    */

    if(EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&

    (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {

    list_add_tail(&epi->rdllink, &ep->rdllist);

    ricnt++;

    }

    }

    if(ricnt) {

    /*

    * Wake up ( if active ) both the eventpoll wait list and the ->poll()

    * wait list.

    */

    if(waitqueue_active(&ep->wq))

    wake_up(&ep->wq);

    if(waitqueue_active(&ep->poll_wait))

    pwake++;

    }

    write_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */

    if(pwake)

    ep_poll_safewake(&psw, &ep->poll_wait);

    }

    ep_reinject_items把txlist里的一部分fd又放回rdllist,那么,是把哪一部分fd放回去呢?看上面if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&这个判断——是哪些“没有标上EPOLLET”(标红代码)且“事件被关注”(标蓝代码)的fd被重新放回了rdllist。那么下次epoll_wait当然会又把rdllist里的fd拿来拷给用户了。举个例子。假设一个socket,只是connect,还没有收发数据,那么它的poll事件掩码总是有POLLOUT的(参见上面的驱动示例),每次调用epoll_wait总是返回POLLOUT事件(比较烦),因为它的fd就总是被放回rdllist;假如此时有人往这个socket里写了一大堆数据,造成socket塞住(不可写了),那么(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {里的判断就不成立了(没有POLLOUT了),fd不会放回rdllist,epoll_wait将不会再返回用户POLLOUT事件。现在我们给这个socket加上EPOLLET,然后connect,没有收发数据,此时,if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&判断又不成立了,所以epoll_wait只会返回一次POLLOUT通知给用户(因为此fd不会再回到rdllist了),接下来的epoll_wait都不会有任何事件通知了。

    相关文章

      网友评论

          本文标题:linux系统下poll和epoll内核源代码剖析

          本文链接:https://www.haomeiwen.com/subject/nswnjktx.html