美文网首页
NIO源码阅读(2)-EPollSelectorImpl

NIO源码阅读(2)-EPollSelectorImpl

作者: allanYan | 来源:发表于2016-10-08 16:25 被阅读0次

概述

根据NIO源码阅读(1)-SelectorProvider一文的分析,Linux下SelectorProvider的实现类为EpollSelectorProvider,下面具体看看它的实现;

EPollSelectorProvider

public abstract class SelectorProviderImpl
    extends SelectorProvider
{

    public DatagramChannel openDatagramChannel() throws IOException {
        return new DatagramChannelImpl(this);
    }

    public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
        return new DatagramChannelImpl(this, family);
    }

    public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
    }

    public abstract AbstractSelector openSelector() throws IOException;

    public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
    }

    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }
}

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

EPollSelectorProvider实现比较简单,主要实现了openSelector方法,返回EPollSelectorImpl对象;

EPollSelectorImpl

EpollSelectorImpl继承自SelectorImpl,比较重要的几个成员变量如下:

   //epoll的select方法会堵塞线程,那么如何唤醒堵塞线程呢?
   //其实很简单,当有读写事件发生时,select方法会返回;
   //因此EpollSelectorImpl注册了一对管道fd0,fd1,并将fd0注册到selector;  
   //当要中断select方法时,往fd1中写入数字1会导致fd0有可读内容,select方法会返回;
    protected int fd0;
    protected int fd1;
    //Epoll的具体实现,关键方法采用JNI实现
    EPollArrayWrapper pollWrapper;
   //触发事件但还未处理完毕的SelectionKey,比如有可读数据,但未读完;
    protected Set<SelectionKey> selectedKeys;
   //将selectedKeys暴露给外部访问,可以删除元素,但不可添加元素
    private Set<SelectionKey> publicSelectedKeys; 
    //注册到Selector的所有SelectionKey
    protected HashSet<SelectionKey> keys;
    //将keys暴露给外部访问,采用不可变对象,防止被修改
    private Set<SelectionKey> publicKeys;  

此处要注意:在处理selectedKeys()方法返回的SelectionKey集合时,对于处理完毕的SelectionKey,需要手工从集合中清除,否则下次调用selectedKeys()方法时,还会返回该SelectionKey对象;

构造函数

EPollSelectorImpl(SelectorProvider sp) {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);//管道
        fd0 = (int) (pipeFds >>> 32);//读
        fd1 = (int) pipeFds;//写
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<Integer,SelectionKeyImpl>();
    }
  1. 首先通过IOUtil.makePipe(false)返回了一个非堵塞的管道(pipe),底层是通过Linux的pipe系统调用实现的;返回两个int常量,分别指向管道的读、写文件描述符;关于管道的作用,前面已经有解释,此处不展开;
  2. 接着定义了EPollArrayWrapper变量,并调用initInterrup方法,将fd0注册到epoll;
  3. 最后定义了一个map类型的变量fdToKey,将channel的文件描述符ID和SelectionKey建立映射关系;

select方法

Selector.select方法最终调用的是doSelect方法:

 protected int doSelect(long timeout)
        throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();
        //处理待取消的SelectionKey(调用SelectionKey.cancel()方法取消)
        processDeregisterQueue();
        try {
           //见java.nio.channels.spi.AbstractSelector类,
           //通过blockedOn方法将前线程的blocker变量设为Interruptible类实例,
           //当线程中断时,会通过Interruptible类的interrupt间接调用wakeup方法,
           //使得pollWrapper.poll方法返回
            begin();
            //调用epoll底层实现
            pollWrapper.poll(timeout);
        } finally {
           //见java.nio.channels.spi.AbstractSelector类,将当前线程的blocker变量设为null
            end();
        }
       //处理待取消的SelectionKey(调用SelectionKey.cancel()方法取消)
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
           //清除epoll事件
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();//将中断标志设为false
                IOUtil.drain(fd0);//将fd0的数据全部读完
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
//AbstractSelectionKey
public final void cancel() {
        synchronized (this) {
            if (valid) {
                valid = false;
                ((AbstractSelector)selector()).cancel(this);
            }
        }
    }
  1. 如果selector已经关闭,抛出异常;
  2. 调用processDeregisterQueue方法,将cancel的selectionKey从selector中删除,底层会调用epoll_ctl方法移除被epoll所监听的文件描述符;
  3. begin和end方法主要是为了处理线程中断,将线程的中断转化为Selector的wakeup方法,避免线程堵塞在IO操作上;

从上面的源码可以知道,SelectionKey.cancel方法会将valid标志设为false,但不会立即将SelectionKey删除,而是在下一次select方法调用时进行处理;

register方法

register方法是在父类SelectorImpl中定义的:

 protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment){
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);
    synchronized (publicKeys) {
        implRegister(k);
    }
    //k.interestOps(ops)=>channel.translateAndSetInterestOps(ops, this)方法将ops转换为事件=>EPollSelectorImpl.putEventOps>=pollWrapper.setInterest(ch.getFDVal(), ops)
    //绕来绕去,最终的逻辑还是在EPollArrayWrapper中
    k.interestOps(ops);
    return k;
}

protected void implRegister(SelectionKeyImpl ski) {
    if (closed)
        throw new ClosedSelectorException();
    SelChImpl ch = ski.channel;
    int fd = Integer.valueOf(ch.getFDVal());
    fdToKey.put(fd, ski);
    pollWrapper.add(fd);
    keys.add(ski);
}

//SocketChannelImpl.java
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
    int newOps = 0;
    if ((ops & SelectionKey.OP_READ) != 0)
        newOps |= PollArrayWrapper.POLLIN;//读事件
    if ((ops & SelectionKey.OP_WRITE) != 0)
        newOps |= PollArrayWrapper.POLLOUT;//写事件
    if ((ops & SelectionKey.OP_CONNECT) != 0)
        newOps |= PollArrayWrapper.POLLCONN;//连接事件
    sk.selector.putEventOps(sk, newOps);
}

//EPollArrayWrapper.java
void setInterest(int fd, int mask) {
    synchronized (updateLock) {
        int oldCapacity = updateDescriptors.length;
        if (updateCount == oldCapacity) {//数组满了,需要扩容
            int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;//容量增加64
            int[] newDescriptors = new int[newCapacity];//新建数组
            System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);//将老数组元素挪到新数组
            updateDescriptors = newDescriptors;
        }
        updateDescriptors[updateCount++] = fd;//记录发生变化的fd

        //
        byte b = (byte)mask;
        assert (b == mask) && (b != KILLED);
        setUpdateEvents(fd, b, false);//记录fd发生变化的事件
    }
}

register方法将channel的文件描述符ID和SelectionKey添加到fdToKey,并调用pollWrapper.add方法和ke.interestOps(ops)方法添加更新事件,但直到调用pollWrapper的poll方法,才会真正进行处理,将channel注册到epoll;

EPollArrayWrapper

EPollArrayWrapper封装了底层的Epoll操作,Linux的Epoll接口定义请参考/usr/include/sys/epoll.h;

初始化

由于JDK是基于Linux kernel 2.4进行构建的,因此此处函数名称并不是写死的,而是采用dlsym系统调用从进程加载的so库中进行查找后,返回函数指针实现的;

epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
epoll_ctl_func    = (epoll_ctl_t)    dlsym(RTLD_DEFAULT, "epoll_ctl");
epoll_wait_func   = (epoll_wait_t)   dlsym(RTLD_DEFAULT, "epoll_wait");

epoll_create

  1. 函数定义

    int epoll_create(int size);
    
  2. 函数描述
    打开epoll文件描述符,如果为-1,则说明出错了;从Linux 2.6.8开始,int参数被忽略,但必须传入大于0的数字。

  3. JNI实现:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    int epfd = (*epoll_create_func)(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

epoll_ctl

  1. 函数定义

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    typedef union epoll_data {
                    void *ptr;
                    int fd;
                    __uint32_t u32;
                    __uint64_t u64;
                } epoll_data_t;
    
    struct epoll_event {
                    __uint32_t events;      /* Epoll events */
                    epoll_data_t data;      /* User data variable */
                };
    
  2. 函数说明
    增加或者移除被epoll所监听的文件描述符;

  • op选项可以为下面三者之一:

    • EPOLL_CTL_ADD(1),为fd注册event事件
    • EPOLL_CTL_MOD(3),对fd的event事件进行修改
    • EPOLL_CTL_DEL(2),删除fd已注册的event事件
  • events可以是以下几个宏的集合:

    • EPOLLIN= 0x001:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    • EPOLLPRI=0x002:表示对应的文件描述符有紧急的数据可读;
    • EPOLLOUT=0x004:表示对应的文件描述符可以写;
    • EPOLLERR=0x008:表示对应的文件描述符发生错误;
    • EPOLLHUP=0x010:表示对应的文件描述符被挂断;
    • EPOLLET=1<<31: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(LevelTriggered)来说的;
    • EPOLLONESHOT=1<<30:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里;
  1. JNI实现
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    RESTARTABLE((*epoll_ctl_func)(epfd, (int)opcode, (int)fd, &event), res);

      if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }

上面的RESTARTABLE实际上是个宏,主要是为了处理EINTR错误,此时需要重试;另外从上面的源码可以知道JDK并没有实现边缘触发,关于边缘触发和水平触发的差异简单列举如下,边缘触发的性能更高,但编程难度也更高,netty就重新实现了Epoll机制,采用边缘触发方式;另外像nginx等也采用的是边缘触发:

  • LT是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。

  • ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核会通知你一次,并且除非你做了某些操作导致那个文件描述符不再为就绪状态了,否则不会再次发送通知。

可以看到,本来内核在被DMA中断,捕获到IO设备来数据后,只需要查找这个数据属于哪个文件描述符,进而通知线程里等待的函数即可,但是,LT要求内核在通知阶段还要继续再扫描一次刚才所建立的内核fd和io对应的那个数组,因为应用程序可能没有真正去读上次通知有数据后的那些fd,这种沟通方式效率是很低下的,只是方便编程而已;

epoll_wait

  1. 函数定义:

    int epoll_wait(int epfd, struct epoll_event * events,
                    int maxevents, int timeout);
    
  2. 函数描述:

    用来等待发生在监听描述符上的事件。它会一直阻塞直到事件发生,另外注意的是,由于epoll_wait同步等待,有可能被信号中断,此时需要重新进行读或写操作;

  3. JNI实现

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

RESTARTABLE是宏的名字,它和iepoll都是为了处理EINTR错误而对epoll_wait进行了封装;

如果进程在一个慢系统调用(slow system call)中阻塞时,当捕获到某个信号且相应信号处理函数返回时,这个系统调用被中断,调用返回错误,设置errno为EINTR

慢系统调用(slow system call):此术语适用于那些可能永远阻塞的系统调用。永远阻塞的系统调用是指调用有可能永远无法返回,多数网络支持函数都属于这一类。如:若没有客户连接到服务器上,那么服务器的accept调用就不会返回

相关文章

网友评论

      本文标题:NIO源码阅读(2)-EPollSelectorImpl

      本文链接:https://www.haomeiwen.com/subject/aiiuyttx.html