美文网首页
DPDK 中断处理流程

DPDK 中断处理流程

作者: 分享放大价值 | 来源:发表于2021-07-25 09:59 被阅读0次

    本文整理下之前的学习笔记,基于DPDK17.11版本源码分析。主要分析一下中断处理流程。网卡支持的中断有多种类型,比如收发包,LSC(链路状态变化),mailbox等,但是DPDK使用PMD来收发包,不用处理收发包中断。

    将网卡绑定到igb_uio时会注册uio,生成/dev/uiox字符设备。DPDK初始化时会open /dev/uiox设备,对应到kernel端会申请中断号,并注册中断处理函数。DPDK还会创建中断处理线程,并注册用户态的中断处理函数,注册时将open的fd添加到epoll队列中等待中断发生。中断发生时,首先调用kernel中注册的中断处理函数,此函数主要用来唤醒用户态的中断处理线程,中断处理线程再调用用户态的中断处理函数。

    下面通过代码分析了整个过程。

    创建中断处理线程

    eal初始化时,会创建中断处理线程。

    int
    rte_eal_init(int argc, char **argv)
        rte_eal_intr_init
            /* init the global interrupt source head */
            //初始化全局链表 intr_sources,后面会使用 rte_intr_callback_register 注册中断源到此链表
            TAILQ_INIT(&intr_sources);
    
            //创建管道intr_pipe,当主线程通过rte_intr_callback_register注册了中断处理函数时,需要通过pipe通知
            //中断处理线程将fd添加到epoll中等待事件发生
            pipe(intr_pipe.pipefd)
    
            //创建中断处理线程
            rte_ctrl_thread_create(&intr_thread, "eal-intr-thread", NULL, eal_intr_thread_main, NULL);
    
            /* Set thread_name for aid in debugging. */
            //设置线程名字为eal-intr-thread
            snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "eal-intr-thread");
            ret_1 = rte_thread_setname(intr_thread, thread_name);
    

    中断处理线程逻辑

    static __attribute__((noreturn)) void *
    eal_intr_thread_main(__rte_unused void *arg)
    {
        //最外层的for循环永远不会退出
        for(::) {
            static struct epoll_event pipe_event = {
                .events = EPOLLIN | EPOLLPRI,
            };
            struct rte_intr_source *src;
            unsigned numfds = 0;
            //创建epoll,用来获取fd事件
            int pfd = epoll_create(1);
            //获取管道intr_pipe读描述符,默认监听管道的读事件
            pipe_event.data.fd = intr_pipe.readfd;
            //将fd添加到epoll,用来获取读事件
            epoll_ctl(pfd, EPOLL_CTL_ADD, intr_pipe.readfd, &pipe_event);
            numfds++;
    
            //加锁保护链表 intr_sources
            rte_spinlock_lock(&intr_lock);
            struct rte_intr_source *src;
            //遍历intr_sources,将中断fd添加到epoll进行监听
            TAILQ_FOREACH(src, &intr_sources, next) {
                //跳过没有callback的中断源
                if (src->callbacks.tqh_first == NULL)
                    continue; /* skip those with no callbacks */
                //添加感兴趣的事件
                ev.events = EPOLLIN | EPOLLPRI | EPOLLRDHUP | EPOLLHUP;
                //获取src代表的中断源fd
                ev.data.fd = src->intr_handle.fd;
                //将此中断fd加入epoll
                epoll_ctl(pfd, EPOLL_CTL_ADD, src->intr_handle.fd, &ev);
                
                //numfds表示epoll中监听的fd个数
                numfds++;
            }
            rte_spinlock_unlock(&intr_lock);
            
            //等待事件发生
            eal_intr_handle_interrupts(pfd, numfds);
    
            //每次循环结束都要释放epoll fd,下次循环重新创建新的
            /**
             * when we return, we need to rebuild the
             * list of fds to monitor.
             */
            close(pfd);
        }
    }
    

    调用epoll_wait等待事件发生,如果有事件了再调用 eal_intr_process_interrupts 处理发生的事件。

    static void
    eal_intr_handle_interrupts(int pfd, unsigned totalfds)
    {
        struct epoll_event events[totalfds];
        for(;;) {
            //#define EAL_INTR_EPOLL_WAIT_FOREVER (-1)
            //最后一个参数为-1表示堵塞等待事件的到来
            nfds = epoll_wait(pfd, events, totalfds, EAL_INTR_EPOLL_WAIT_FOREVER);
    
            //出现异常错误,返回
            /* epoll_wait fail */
            if (nfds < 0) {
                if (errno == EINTR)
                    continue;
                RTE_LOG(ERR, EAL,
                    "epoll_wait returns with fail\n");
                return;
            }
            /* epoll_wait timeout, will never happens here */
            //超时了?这里是不可能发生的,因为最后一个参数是 -1
            else if (nfds == 0)
                continue;
    
            //处理发生事件的fd
            /* epoll_wait has at least one fd ready to read */
            if (eal_intr_process_interrupts(events, nfds) < 0)
                return;
        }
    }
    

    处理发生事件的fd,调用用户态注册的中断处理函数。

    static int
    eal_intr_process_interrupts(struct epoll_event *events, int nfds)
    {
        bool call = false;
        int n, bytes_read;
        struct rte_intr_source *src;
        struct rte_intr_callback *cb;
        union rte_intr_read_buffer buf;
        struct rte_intr_callback active_cb;
        //循环处理发生的事件
        for (n = 0; n < nfds; n++) {
            /**
             * if the pipe fd is ready to read, return out to
             * rebuild the wait list.
             */
            //如果是intr_pipe.readfd发生事件说明有新的中断添加进来,
            //需要返回-1,在最外层重建 epoll
            if (events[n].data.fd == intr_pipe.readfd){
                int r = read(intr_pipe.readfd, buf.charbuf, sizeof(buf.charbuf));
                return -1;
            }
    
            //检查events[n].data.fd是哪个中断的事件
            rte_spinlock_lock(&intr_lock);
            TAILQ_FOREACH(src, &intr_sources, next)
                if (src->intr_handle.fd ==
                        events[n].data.fd)
                    break;
            if (src == NULL){
                rte_spinlock_unlock(&intr_lock);
                continue;
            }
    
            /* mark this interrupt source as active and release the lock. */
            src->active = 1;
            rte_spinlock_unlock(&intr_lock);
    
            //读取fd
            bytes_read = read(events[n].data.fd, &buf, bytes_read);
            if (bytes_read < 0) {
                if (errno == EINTR || errno == EWOULDBLOCK)
                    continue;
    
                RTE_LOG(ERR, EAL, "Error reading from file "
                    "descriptor %d: %s\n",
                    events[n].data.fd,
                    strerror(errno));
            } else if (bytes_read == 0)
                RTE_LOG(ERR, EAL, "Read nothing from file "
                    "descriptor %d\n", events[n].data.fd);
            else
                //正常的话,设置call为true
                call = true;
    
            /* grab a lock, again to call callbacks and update status. */
            rte_spinlock_lock(&intr_lock);
            
            if (call) {
                //调用 src 上注册的中断处理函数
                /* Finally, call all callbacks. */
                TAILQ_FOREACH(cb, &src->callbacks, next) {
    
                    /* make a copy and unlock. */
                    active_cb = *cb;
                    rte_spinlock_unlock(&intr_lock);
    
                    /* call the actual callback */
                    active_cb.cb_fn(active_cb.cb_arg);
    
                    /*get the lock back. */
                    rte_spinlock_lock(&intr_lock);
                }
            }
    
            /* we done with that interrupt source, release it. */
            src->active = 0;
            rte_spinlock_unlock(&intr_lock);
        }
        
        return 0;
    }
    

    用户态注册中断处理函数
    调用 rte_intr_callback_register 注册中断处理函数到全局链表 intr_sources。

    int
    rte_intr_callback_register(const struct rte_intr_handle *intr_handle,
                rte_intr_callback_fn cb, void *cb_arg)
    {
        struct rte_intr_source *src;
        struct rte_intr_callback *callback;
    
        //先分配内存,保存cb和cb_arg
        /* allocate a new interrupt callback entity */
        callback = rte_zmalloc("interrupt callback list", sizeof(*callback), 0);
    
        callback->cb_fn = cb;
        callback->cb_arg = cb_arg;
    
        //遍历intr_sources,如果没有找到fd,则需要注册一个,并通知中断处理线程将fd添加到
        //epoll中,等待事件通知。一个fd可以注册多个回调函数。
        rte_spinlock_lock(&intr_lock);
        /* check if there is at least one callback registered for the fd */
        TAILQ_FOREACH(src, &intr_sources, next) {
            if (src->intr_handle.fd == intr_handle->fd) {
                /* we had no interrupts for this */
                if TAILQ_EMPTY(&src->callbacks)
                    wake_thread = 1;
    
                TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
                ret = 0;
                break;
            }
        }
    
        //如果是首次给fd添加回调函数,还需要通过管道通知中断处理线程将此fd添加到
        //epoll中,等待中断事件发生
        /* no existing callbacks for this - add new source */
        if (src == NULL) {
            rte_zmalloc("interrupt source list", sizeof(*src), 0);
            //将callback 插入 src->callbacks
            src->intr_handle = *intr_handle;
            TAILQ_INIT(&src->callbacks);
            TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
            //将 src 插入全局链表 intr_sources
            TAILQ_INSERT_TAIL(&intr_sources, src, next);
            //设置为1
            wake_thread = 1;
            ret = 0;
        }
    
        rte_spinlock_unlock(&intr_lock);
    
        /**
         * check if need to notify the pipe fd waited by epoll_wait to
         * rebuild the wait list.
         */
        //如果是第一次添加此中断,需要中断处理线程将此
        //中断处理fd添加到epoll,等待中断事件到来。
        if (wake_thread)
            if (write(intr_pipe.writefd, "1", 1) < 0)
                return -EPIPE;
    
        return ret;
    }
    

    注册中断处理函数,并使能中断

    1. open /dev/uiox 注册kernel中的中断处理函数
    rte_pci_probe -> rte_pci_probe_one_driver -> rte_pci_map_device -> pci_uio_map_resource-> pci_uio_alloc_resource
        //获取网卡的 uio_num
        uio_num = pci_get_uio_dev(dev, dirname, sizeof(dirname), 1);
            /* depending on kernel version, uio can be located in uio/uioX or uio:uioX */
            //到目录/sys/bus/pci/devices/'pci address'/,找到uio目录,
            //获取uio number(网卡绑定到igb_uio驱动后,会创建此目录)
            snprintf(dirname, sizeof(dirname),
                    "%s/" PCI_PRI_FMT "/uio", rte_pci_get_sysfs_path(),
                    loc->domain, loc->bus, loc->devid, loc->function);
        //打开 /dev/uiox 设备,获取fd,并保存到dev->intr_handle.fd
        snprintf(devname, sizeof(devname), "/dev/uio%u", uio_num);
        /* save fd if in primary process */
        //打开 /dev/uiox,调用到kernel中的函数 uio_open
        dev->intr_handle.fd = open(devname, O_RDWR);
    
    //uio_open调用到最后会申请中断,并注册中断处理函数 igbuio_pci_irqhandler
    uio_open -> igbuio_pci_open -> igbuio_pci_enable_interrupts
        //更新pci配置空间中msix capability字段,并申请中断号
        pci_enable_msix(udev->pdev, &msix_entry, 1)
        dev_dbg(&udev->pdev->dev, "using MSI-X");
        udev->info.irq_flags = IRQF_NO_THREAD;
        udev->info.irq = msix_entry.vector;
        udev->mode = RTE_INTR_MODE_MSIX;
    
        //注册中断处理函数
        request_irq(udev->info.irq, igbuio_pci_irqhandler,
              udev->info.irq_flags, udev->info.name, udev);
    
    
    pci配置空间可通过 lspci -s 0000:81:00.0 -vv 查看,
    对于 MSI-X,默认为 Enable-
            Capabilities: [70] MSI-X: Enable- Count=129 Masked-
                    Vector table: BAR=3 offset=00000000
                    PBA: BAR=3 offset=00001000
    经过igbuio_pci_enable_interrupts后,可看到变成了 Enable+,表示使能了pci层的中断
            Capabilities: [70] MSI-X: Enable+ Count=129 Masked-
                    Vector table: BAR=3 offset=00000000
                    PBA: BAR=3 offset=00001000
    
    1. 注册用户态的中断处理函数,并使能中断
    rte_pci_probe -> rte_pci_probe_one_driver -> dr->probe(dr, dev) -> eth_ixgbe_dev_init
        //注册中断处理函数 ixgbe_dev_interrupt_handler
        rte_intr_callback_register(intr_handle, ixgbe_dev_interrupt_handler, eth_dev);
    
        /* enable uio/vfio intr/eventfd mapping */
        rte_intr_enable(intr_handle);
            uio_intr_enable(intr_handle)
                const int value = 1;
                //write会调用到kernel中的 uio_write
                write(intr_handle->fd, &value, sizeof(value))
                    //调用uio字符设备驱动中的 uio_write
                    //调用igb_uio驱动中的 igbuio_pci_irqcontrol
                    idev->info->irqcontrol(idev->info, irq_on)
                    //清除中断 mask
                    igbuio_mask_irq(pdev, udev->mode, irq_state);
    
        /* enable support intr */
        ixgbe_enable_intr(eth_dev);
            struct ixgbe_interrupt *intr =
                IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);
            struct ixgbe_hw *hw =
                IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
            //使能网卡层中断,intr->mask 指定了中断类型
            IXGBE_WRITE_REG(hw, IXGBE_EIMS, intr->mask);
            IXGBE_WRITE_FLUSH(hw);
    

    发生中断时的处理

    前面调用rte_intr_callback_register时,已经将 /dev/uiox 的 fd 添加到了 epoll 监听队列,在kernel中会调用uio_poll 等待事件到来。

    static unsigned int uio_poll(struct file *filep, poll_table *wait)
    {
        struct uio_listener *listener = filep->private_data;
        struct uio_device *idev = listener->dev;
    
        if (!idev->info->irq)
            return -EIO;
    
        //将当前进程(调用epoll的进程,即中断处理线程)作为等待队列的一个元素添加到fd的等待队列idev->wait中,
        //当前进程进入睡眠状态,等待事件发生后被唤醒
        poll_wait(filep, &idev->wait, wait);
    
        //没中断时这两个值是相等的,中断发生时会将idev->event加1,
        //这两个值不相等时说明有中断事件,返回POLLIN
        if (listener->event_count != atomic_read(&idev->event))
            return POLLIN | POLLRDNORM;
        return 0;
    }
    

    在kernel端,注册了真正的中断处理函数 igbuio_pci_irqhandler,当中断发生时,调用uio_event_notify唤醒调用epoll的的进程,即中断处理线程。

    static irqreturn_t
    igbuio_pci_irqhandler(int irq, void *dev_id)
    {
        struct rte_uio_pci_dev *udev = (struct rte_uio_pci_dev *)dev_id;
        struct uio_info *info = &udev->info;
    
        uio_event_notify(info);
            struct uio_device *idev = info->uio_dev;
            //event加1
            atomic_inc(&idev->event);
            //唤醒等待队列上的进程,比如中断处理线程
            wake_up_interruptible(&idev->wait);
            kill_fasync(&idev->async_queue, SIGIO, POLL_IN);
        /* Message signal mode, no share IRQ and automasked */
        return IRQ_HANDLED;
    }
    

    被唤醒的进程(即中断处理线程)再次调用 uio_poll 获取发生的事件,此时会返回 POLLIN | POLLRDNORM,然后再调用系统调用read读取数据,此时肯定是有事件的,所以不会再次堵塞等待。

    static ssize_t uio_read(struct file *filep, char __user *buf, size_t count, loff_t *ppos)
    {
        struct uio_listener *listener = filep->private_data;
        struct uio_device *idev = listener->dev;
        DECLARE_WAITQUEUE(wait, current);
        ssize_t retval;
        s32 event_count;
    
        if (!idev->info->irq)
            return -EIO;
    
        if (count != sizeof(s32))
            return -EINVAL;
    
        add_wait_queue(&idev->wait, &wait);
    
        do {
            set_current_state(TASK_INTERRUPTIBLE);
    
            //再次判断 idev->event 和 listener->event_count,
            //不相等说明有中断事件发生,将count拷贝到用户态read提供的buf中,
            //将 idev->event 的值赋给 listener->event_count 等待下次中断到来
            event_count = atomic_read(&idev->event);
            if (event_count != listener->event_count) {
                if (copy_to_user(buf, &event_count, count))
                    retval = -EFAULT;
                else {
                    listener->event_count = event_count;
                    retval = count;
                }
                break;
            }
    
            if (filep->f_flags & O_NONBLOCK) {
                retval = -EAGAIN;
                break;
            }
    
            if (signal_pending(current)) {
                retval = -ERESTARTSYS;
                break;
            }
            schedule();
        } while (1);
    
        __set_current_state(TASK_RUNNING);
        remove_wait_queue(&idev->wait, &wait);
    
        return retval;
    }
    

    用户态中断处理函数

    中断处理线程正确读取到事件后,会调用用户态注册的中断处理函数,比如 ixgbe_dev_interrupt_handler。

    static void
    ixgbe_dev_interrupt_handler(void *param)
    {
        struct rte_eth_dev *dev = (struct rte_eth_dev *)param;
        //从EICR寄存器可以获知中断类型:比如mailbox,lsc(link status change), 
        //接收/发送数据包(但是在dpdk中,收/发数据包不使用中断)
        ixgbe_dev_interrupt_get_status(dev);
        //根据中断类型进行处理
        ixgbe_dev_interrupt_action(dev, dev->intr_handle);
    }
    
    /*
     * It reads ICR and sets flag (IXGBE_EICR_LSC) for the link_update.
     *
     * @param dev
     *  Pointer to struct rte_eth_dev.
     *
     * @return
     *  - On success, zero.
     *  - On failure, a negative value.
     */
    static int
    ixgbe_dev_interrupt_get_status(struct rte_eth_dev *dev)
    {
        uint32_t eicr;
        //hw指向BAR0的虚拟地址,可以通过偏移访问寄存器
        struct ixgbe_hw *hw = IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
        struct ixgbe_interrupt *intr =
            IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);
    
        /* clear all cause mask */
        ixgbe_disable_intr(hw);
    
        //读取寄存器 IXGBE_EICR,记录了发送中断的类型
        /* read-on-clear nic registers here */
        eicr = IXGBE_READ_REG(hw, IXGBE_EICR);
        PMD_DRV_LOG(DEBUG, "eicr %x", eicr);
    
        intr->flags = 0;
    
        //是否发生了 LSX 事件?
        /* set flag for async link update */
        if (eicr & IXGBE_EICR_LSC)
            intr->flags |= IXGBE_FLAG_NEED_LINK_UPDATE;
    
        //是否发生了 mailbox 事件,mailbox 是 PF 和 VF 通信机制
        if (eicr & IXGBE_EICR_MAILBOX)
            intr->flags |= IXGBE_FLAG_MAILBOX;
    
        ...
    
        return 0;
    }
    
    /*
     * It executes link_update after knowing an interrupt occurred.
     *
     * @param dev
     *  Pointer to struct rte_eth_dev.
     *
     * @return
     *  - On success, zero.
     *  - On failure, a negative value.
     */
    static int
    ixgbe_dev_interrupt_action(struct rte_eth_dev *dev,
                   struct rte_intr_handle *intr_handle)
    {
        struct ixgbe_interrupt *intr =
            IXGBE_DEV_PRIVATE_TO_INTR(dev->data->dev_private);
        int64_t timeout;
        struct rte_eth_link link;
        struct ixgbe_hw *hw =
            IXGBE_DEV_PRIVATE_TO_HW(dev->data->dev_private);
    
        PMD_DRV_LOG(DEBUG, "intr action type %d", intr->flags);
    
        //处理 mailbox 事件
        if (intr->flags & IXGBE_FLAG_MAILBOX) {
            ixgbe_pf_mbx_process(dev);
            intr->flags &= ~IXGBE_FLAG_MAILBOX;
        }
    
        //处理 link 变化事件
        if (intr->flags & IXGBE_FLAG_NEED_LINK_UPDATE) {
            /* get the link status before link update, for predicting later */
            memset(&link, 0, sizeof(link));
            rte_ixgbe_dev_atomic_read_link_status(dev, &link);
    
            ixgbe_dev_link_update(dev, 0);
    
            /* likely to up */
            if (!link.link_status)
                /* handle it 1 sec later, wait it being stable */
                timeout = IXGBE_LINK_UP_CHECK_TIMEOUT;
            /* likely to down */
            else
                /* handle it 4 sec later, wait it being stable */
                timeout = IXGBE_LINK_DOWN_CHECK_TIMEOUT;
    
            ixgbe_dev_link_status_print(dev);
            if (rte_eal_alarm_set(timeout * 1000,
                          ixgbe_dev_interrupt_delayed_handler, (void *)dev) < 0)
                PMD_DRV_LOG(ERR, "Error setting alarm");
            else {
                /* remember original mask */
                intr->mask_original = intr->mask;
                /* only disable lsc interrupt */
                intr->mask &= ~IXGBE_EIMS_LSC;
            }
        }
    
        PMD_DRV_LOG(DEBUG, "enable intr immediately");
        ixgbe_enable_intr(dev);
        rte_intr_enable(intr_handle);
    
        return 0;
    }
    

    相关文章

      网友评论

          本文标题:DPDK 中断处理流程

          本文链接:https://www.haomeiwen.com/subject/fagumltx.html