美文网首页
Java NIO之EPollSelectorImpl

Java NIO之EPollSelectorImpl

作者: 听一首老歌 | 来源:发表于2018-12-03 22:44 被阅读0次

    本文简述JDK1.7的NIO在linux平台上的实现,对java NIO的一些核心概念如Selector,Channel,Buffer等,不会做过多解释,这些请参考JDK的文档。JDK 1.7 NIO Selector在linux平台上的实现类是sun.nio.ch.EPollSelectorImpl,这个类通过linux下的epoll系列系统调用实现NIO,因此在介绍这个类的实现之前,先介绍一下linux的epoll。epoll是poll/select系统调用的一个改进版本,能以更高的性能实现IO事件的检测和分发(主要归功于epoll的事件回调机制,下文详述),主要包含以下3个系统调用:

    #include 

    int epoll_create(int size);

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

    上述函数中,epoll_create函数负责创建一个检测IO事件的epoll实例,size参数用于“暗示”操作系统事件队列的长度,在linux-2.6.32内核中,此参数被忽略。epoll_ctl函数用于管理文件描述符的事件集,使用此函数可以注册、修改、删除一个或多个事件。epoll_wait负责检测事件,这三个函数的详细描,请参阅epoll的man文档。

    Java类sun.nio.ch.EPollSelectorImpl主要的功能都委托给sun.nio.ch. EPollArrayWrapper实现 (下文所引java代码反编译自linux版jdk_1.7.0_17/lib/rt.jar):

    package sun.nio.ch;

    class EPollArrayWrapper{

    private native int epollCreate();

    private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);

    private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;

    }

    EPollArrayWrapper的三个native方法的实现代码可参阅openjdk7/jdk/src/solaris/native/sun/nio/ch/ EPollArrayWrapper.c,可看到这三个native方法正是对上述epoll系列系统调用的包装。(其他jdk的实现代码会有所不同,但归根结底都是对epoll系列系统调用的包装)。

    EPollSelectorImpl. implRegister方法(Selector.register方法的具体实现),通过调用epoll_ctl向epoll实例中注册事件:

    protected void implRegister(SelectionKeyImpl paramSelectionKeyImpl) {

    if (this.closed)

    throw new ClosedSelectorException();

    SelChImpl localSelChImpl = paramSelectionKeyImpl.channel;

    this.fdToKey.put(Integer.valueOf(localSelChImpl.getFDVal()), paramSelectionKeyImpl);

    this.pollWrapper.add(localSelChImpl);

    this.keys.add(paramSelectionKeyImpl);

    }

    上述方法中,除了向epoll实例注册事件外,还将注册的文件描述符(fd)与SelectionKey的对应关系添加到fdToKey中,这个map维护了文件描述符与SelectionKey的映射。每当向Selector中注册一个Channel时,向此map中添加一条记录,而当Channel.close、SelectionKey.cancel方法调用时,则从fdToKey中移除与Channel的fd相关联的SelectionKey,具体代码在EPollSelectorImpl.implDereg方法中:

    protected void implDereg(SelectionKeyImpl paramSelectionKeyImpl) throws IOException {

    assert (paramSelectionKeyImpl.getIndex() >= 0);

    SelChImpl localSelChImpl = paramSelectionKeyImpl.channel;

    int i = localSelChImpl.getFDVal();

    this.fdToKey.remove(Integer.valueOf(i));

    this.pollWrapper.release(localSelChImpl);

    paramSelectionKeyImpl.setIndex(-1);

    this.keys.remove(paramSelectionKeyImpl);

    this.selectedKeys.remove(paramSelectionKeyImpl);

    deregister(paramSelectionKeyImpl);

    SelectableChannel localSelectableChannel = paramSelectionKeyImpl.channel();

    if ((!localSelectableChannel.isOpen()) && (!localSelectableChannel.isRegistered()))

    ((SelChImpl)localSelectableChannel).kill();

    }

    EPollSelectorImpl. doSelect(Selector.select方法的实现),则通过调用epoll_wait实现事件检测:

    protected int doSelect(long paramLong)

    throws IOException

    {

    if (this.closed)

    throw new ClosedSelectorException();

    processDeregisterQueue();

    try {

    begin();

    this.pollWrapper.poll(paramLong);

    } finally {

    end();

    }

    processDeregisterQueue();

    int i = updateSelectedKeys();

    if (this.pollWrapper.interrupted())

    {

    this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);

    synchronized (this.interruptLock) {

    this.pollWrapper.clearInterrupted();

    IOUtil.drain(this.fd0);

    this.interruptTriggered = false;

    }

    }

    return i;

    }

    此方法的主要流程概括如下:

    1.通过epoll_wait调用(this.pollWrapper.poll)获取已就绪的文件描述符集合

    2.通过fdToKey查找文件描述符对应的SelectionKey,并更新之,更新SelectionKey的具体代码在EPollSelectorImpl .updateSelectedKeys中:

    private int updateSelectedKeys()

    {

    int i = this.pollWrapper.updated;

    int j = 0;

    for (int k = 0; k < i; k++) { int m = this.pollWrapper.getDescriptor(k); SelectionKeyImpl localSelectionKeyImpl = (SelectionKeyImpl)this.fdToKey.get(Integer.valueOf(m)); if (localSelectionKeyImpl != null) { int n = this.pollWrapper.getEventOps(k); if (this.selectedKeys.contains(localSelectionKeyImpl)) { if (localSelectionKeyImpl.channel.translateAndSetReadyOps(n, localSelectionKeyImpl)) j++; } else { localSelectionKeyImpl.channel.translateAndSetReadyOps(n, localSelectionKeyImpl); if ((localSelectionKeyImpl.nioReadyOps() & localSelectionKeyImpl.nioInterestOps()) != 0) { this.selectedKeys.add(localSelectionKeyImpl); j++; } } } } return j; }

    关于fdToKey,有几个问题:

    一、为何fdToKey会变得非常大?由上述代码可知,fdToKey变得非常大的可能原因有2个:

    1.注册到Selector上的Channel非常多,例如一个长连接服务器可能要同时维持数十万条连接;

    2.过期或失效的Channel没有及时关闭,因而对应的记录会一直留在fdToKey中,时间久了就会越积越多;

    二、为何fdToKey总是串行读取?fdToKey中记录的读取,是在select方法中进行的,而select方法一般而言总是单线程调用(Selector不是线程安全的)。

    三、tcp发包堆积对导致fdToKey变大吗?一般而言不会,因为fdToKey只负责管理注册到Selector上的channel,与数据传输过程无关。当然,如果tcp发包堆积导致IO框架的空闲连接检测机制失效,无法及时检测并关闭空闲的连接,则有可能导致fdToKey变大。

    下面聊一聊epoll系统调用的具体实现,它的实现代码在(linux-2.6.32.65)fs/eventpoll.c中(下文所引内核代码,由于较长,所以只贴出主流程,省略了错误处理及一些相对次要的细节如参数检查、并发控制等),先看epoll_create 系统调用的实现:

    fs/eventpoll.c

    SYSCALL_DEFINE1(epoll_create, int, size)

    {

    if (size <= 0) return -EINVAL; return sys_epoll_create1(0); } 

    SYSCALL_DEFINE1是一个宏,用于定义有一个参数的系统调用函数,上述宏展开后即成为:

    int sys_epoll_create(int size)

    这就是epoll_create系统调用的入口。至于为何要用宏而不是直接声明,主要是因为系统调用的参数个数、传参方式都有严格限制,最多六个参数, SYSCALL_DEFINE2 -SYSCALL_DEFINE6分别用来定义有2-6个参数的系统调用。由上述代码可知,epoll_create函数最终调用sys_epoll_create1实现具体功能,同时也可以看出size参数被忽略了。sys_epoll_create1的主要代码如下(省略了错误处理及一些次要的细节如参数检查等):

    fs/eventpoll.c

    SYSCALL_DEFINE1(epoll_create1, int, flags)

    {

    int error, fd;

    struct eventpoll *ep = NULL;

    struct file *file;

    error = ep_alloc(&ep);

    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,

    O_RDWR | (flags & O_CLOEXEC));

    fd_install(fd, file);

    ep->file = file;

    return fd;

    }

    上述代码主要是分配一个struct eventpoll实例,并分配与此实例相关联的文件描述符,后续的epoll_ctl,epoll_wait调用通过此文件描述符引用此实例。struct eventpoll的结构如下:

    fs/eventpoll.c

    struct eventpoll {

    spinlock_t lock;

    struct mutex mtx;

    wait_queue_head_t wq;

    wait_queue_head_t poll_wait;

    struct list_head rdllist;

    struct rb_root rbr;

    struct epitem *ovflist;

    struct user_struct *user;

    struct file *file;

    int visited;

    struct list_head visited_list_link;

    }

    上述数据结构的关键部分是:

    1.一个等待队列wq,epoll正是通过此等待队列实现的事件回调

    2.一个就绪列表rdllist,此列表以双链表的形式保存了已就绪的文件描述符

    3.一个红黑树rbr,用于保存已注册过的文件描述符,若重复注册相同的文件描述符,则会返回错误

    等待队列是epoll系统调用的核心机制(不只是epoll,linux下事件的通知、回调等机制大都依赖于等待队列),在讲述epoll_ctl,epoll_wait的实现之前,先来看看等待队列。等待队列可以使一组进程/线程在等待某个事件时睡眠,当等待的事件发生时,内核会唤醒睡眠的进程/线程。注意,下文并不区分进程和线程,在linux下,进程和线程在调度这个意义下(调度就是指linux的进程调度,包括进程的切换、睡眠、唤醒等)并无差别。此机制可以类比java.lang.Object类的wait和notify/notifyAll方法,其中wait方法使线程睡眠,notify/notifyAll方法则唤醒睡眠的一个或全部线程。等待队列主要涉及两个数据结构:

    include/linux/wait.h

    struct __wait_queue_head {

    spinlock_t lock;

    list_head task_list;

    };

    struct __wait_queue {

    unsigned int flags;

    #define WQ_FLAG_EXCLUSIVE0x01

    void *private;

    wait_queue_func_t func;

    struct list_head task_list;

    };

    struct __wait_queue_head是队头结构,task_list 保存了添加到此队列上的元素,struct list_head是标准的linux双链表, 定义如下:

    include/linux/list.h

    struct list_head {

    struct list_head *next, *prev;

    };

    注意,此结构既可以表示双链表的表头,也可以表示一个链表元素,且next,prev这两个指针可以指向任意数据结构。

    struct __wait_queue是等待队列的元素结构,成员func是等待的进程被唤醒时执行的回调函数,其定义如下:

    include/linux/wait.h

    typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);

    struct __wait_queue的成员task_list是一个链表元素用于将此结构放置到struct __wait_queue_head中(这和此结构的task_list成员含义是不同的,此成员的含义为双链表的头),private成员一般指向等待进程的task_struct实例(该结构成员很多,在此就不贴出了,只需要知道linux下每个进程都对应一个task_struct 实例)。

    在使用上,等待队列主要涉及以下函数(或者宏):

    include/linux/wait.h

    __add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);

    #define wait_event(wq, condition)

    #define wake_up_xxx(x,…)

    __add_wait_queue用于将一个进程添加到等待队列,wait_event是一个宏,它用于等待一个事件,当事件未发生时使等待的进程休眠,wake_up_xxx是一系列的宏,包括wake_up,wake_up_all,wake_up_locked,wake_up_interruptible等,负责唤醒休眠在某个事件上的一个或一组进程。关于等待队列的具体实现细节,由于牵涉较广(涉及到进程调度、中断处理等),这里不再详述,可以将add_wait_queue,wait_event类比java.lang.Object的wait方法,而wake_up则可以类比java.lang.Object的notify/notifyAll方法。

    介绍完等待队列后,就可以进一步研究epoll_ctl的实现了,其代码实现中核心的部分是:

    fs/eventpoll.c

    SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,

    struct epoll_event __user *, event)

    {

    if (!tfile->f_op || !tfile->f_op->poll)

    goto error_tgt_fput;

    switch (op) {

    case EPOLL_CTL_ADD:

    error=ep_insert(ep, &epds, tfile, fd);

    break;

    case EPOLL_CTL_DEL:

    error=ep_remove(ep, epi);

    break;

    case EPOLL_CTL_MOD:

    error = ep_modify(ep, epi, &epds);

    break;

    }

    return error;

    }

    什么样的文件描述符可以注册?从那个if判断可以看出,只有文件描述符对应的文件实现了poll方法的才可以,一般而言,字符设备的文件都实现了此方法,网络相关的套接字也实现了此方法,而块设备文件例如ext2/ext3/ext4文件系统文件,都没有实现此方法。实现了poll方法的文件,对应于java NIO的java.nio.channels.SelectableChannel,这也是为何只有 SelectableChannel 才能注册到Selector上的原因。ep_insert,ep_remove,ep_modify分别对应事件的注册、删除、修改,我们以ep_insert为例,看一下事件注册的过程,其关键代码如下:

    fs/eventpoll.c

    static int ep_insert(struct eventpoll *ep, struct epoll_event *event,

    struct file *tfile, int fd)

    {

    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    revents = tfile->f_op->poll(tfile, &epq.pt);

    ep_rbtree_insert(ep, epi);

    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {

    list_add_tail(&epi->rdllink, &ep->rdllist);;

    wake_up_locked(&ep->wq);

    }

    }

    上述代码的主要做的事是:

    1.绑定等待队列的回调函数ep_ptable_queue_proc

    2.调用对应文件的实例的poll方法,此方法的具体实现差别非常大,但绝大多数都会调用wait_event相关方法,在没有事件发生时,使进程睡眠,例如socket对应的实现(代码在net/ipv4/af_inet.c的tcp_poll方法,在此不再详述);

    3.若注册的事件已经发生,则将已就绪的文件描述符插入到eventpoll实例的就绪列表(list_add_tail(&epi->rdllink, &ep->rdllist);),并唤醒睡眠的进程(wake_up_locked(&ep->wq))

    第1步绑定的回调函数ep_ptable_queue_proc,会在等待的事件发生时执行,其主要功能是将就绪的文件描述符插入到eventpoll实例的就绪列表(具体是通过ep_ptable_queue_proc绑定的另一个回调函数ep_poll_callback实现的):

    fs/eventpoll.c

    static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){

    if (!ep_is_linked(&epi->rdllink))

    list_add_tail(&epi->rdllink, &ep->rdllist);

    }

    最后看epoll_wait的实现,有了就绪队列,epoll_wait的实现就比较简单了,只需检查就绪队列是否为空,若为空,则在必要时睡眠或等待:

    fs/eventpoll.c

    SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,

    int, maxevents, int, timeout)

    {

    int error;

    struct file *file;

    struct eventpoll *ep;

    file = fget(epfd);

    ep = file->private_data;

    error = ep_poll(ep, events, maxevents, timeout);

    return error;

    }

    此函数最终调用ep_poll完成其主要功能:

    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,

    int maxevents, long timeout)

    {

    retry:

    if (list_empty(&ep->rdllist)) {

    init_waitqueue_entry(&wait, current);

    wait.flags |= WQ_FLAG_EXCLUSIVE;

    __add_wait_queue(&ep->wq, &wait);

    for (;;) {

    set_current_state(TASK_INTERRUPTIBLE);

    if (!list_empty(&ep->rdllist) || !jtimeout)

    break;

    if (signal_pending(current)) {

    res = -EINTR;

    break;

    }

    spin_unlock_irqrestore(&ep->lock, flags);

    jtimeout = schedule_timeout(jtimeout);

    spin_lock_irqsave(&ep->lock, flags);

    }

    __remove_wait_queue(&ep->wq, &wait);

    set_current_state(TASK_RUNNING);

    }

    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

    spin_unlock_irqrestore(&ep->lock, flags);

    if (!res && eavail &&

    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)

    goto retry;

    return res;

    }

    上述代码主要是检查就绪队列是否为空,若为空时,则根据超时设置判断是否需要睡眠(__add_wait_queue)或等待(jtimeout = schedule_timeout(jtimeout);)。

    综上所述,epoll系统调用通过等待队列,其事件检测(epoll_wait系统调用)的时间复杂度为O(n),其中n是“活跃”的文件描述符总数,所谓的活跃,是指在该文件描述符上有频繁的读写操作,而对比poll或select系统调用(其实现代码在fs/select.c中),其时间复杂度也是O(n),但这个n却是注册的文件描述符的总数。因此,当活跃的文件描述符占总的文件描述符比例较小时,例如,在长连接服务器的场景中,虽然同时可能需要维持数十万条长连接,但其中只有少数的连接是活跃的,使用epoll就比较合适。

    转:Java NIO之EPollSelectorImpl详解 – hellojavacases微信公众号网站

    相关文章

      网友评论

          本文标题:Java NIO之EPollSelectorImpl

          本文链接:https://www.haomeiwen.com/subject/azfpcqtx.html