美文网首页
NIO selector原理浅析

NIO selector原理浅析

作者: zhangshijie | 来源:发表于2017-06-09 21:52 被阅读0次

    无阻塞io是使用单线程或者只使用少量的多线程,每个连接共用一个线程,当处于等待(没有事件)的时候线程资源可以释放出来处理别的请求,通过事件驱动模型当有accept/read/write等事件发生后通知(唤醒)主线程分配资源来处理相关事件。java.nio.channels.Selector就是在该模型中事件的观察者,可以将多个SocketChannel的事件注册到一个Selector上,当没有事件发生时Selector处于阻塞状态,当SocketChannel有accept/read/write等事件发生时唤醒Selector。

    这个Selector是使用了单线程模型,主要用来描述事件驱动模型,要优化性能需要一个好的线程模型来使用,目前比较好的nio框架有Netty,apache的mina等。线程模型这块后面再分享,这里重点研究Selector的阻塞和唤醒原理。

    先看一段简单的Selector使用的代码

    selector = Selector.open();

    ServerSocketChannel ssc = ServerSocketChannel.open();

    ssc.configureBlocking(false);

    ssc.socket().bind(new InetSocketAddress(port));

    ssc.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {

    // select()阻塞,等待有事件发生唤醒

    int selected = selector.select();

    if (selected > 0) {

    Iterator selectedKeys = selector.selectedKeys().iterator();

    while (selectedKeys.hasNext()) {

    SelectionKey key = selectedKeys.next();

    if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {

    // 处理 accept 事件

    } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

    // 处理 read 事件

    } else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {

    // 处理 write 事件

    }

    selectedKeys.remove();

    }

    }

    }

    代码中关键的几个点在:

    Selector.open();

    selector.select();

    阻塞后唤醒可以通过注册在selector上的socket有事件发生 或者 selector.select(timeOut)超时 或者 selector.wakeup()主动唤醒;

    整个阻塞和唤醒的过程涉及到的点非常多,先上一张梳理出的整体图,再进入源码会比较容易理解

    现在通过openjdk中的源码来解析上图中的每一个环节:

    1. Selector.open()

    Selector.java

    -----

    public static Selector open() throws IOException {

    return SelectorProvider.provider().openSelector();

    }

    先看看SelectorProvider.provider()做了什么:

    SelectorProvider.java

    -----

    public static SelectorProvider provider() {

    synchronized (lock) {

    if (provider != null)

    return provider;

    return (SelectorProvider)AccessController

    .doPrivileged(new PrivilegedAction() {

    public Object run() {

    if (loadProviderFromProperty())

    return provider;

    if (loadProviderAsService())

    return provider;

    provider = sun.nio.ch.DefaultSelectorProvider.create();

    return provider;

    }

    });

    }

    }

    其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;

    这里主要以windows的实现来梳理整个流程,拿到provider后来看openSelector()中的实现

    WindowsSelectorProvider.java

    ----

    public AbstractSelector openSelector() throws IOException {

    return new WindowsSelectorImpl(this);

    }

    WindowsSelectorImpl.java

    ----

    WindowsSelectorImpl(SelectorProvider sp) throws IOException {

    super(sp);

    pollWrapper = new PollArrayWrapper(INIT_CAP);

    wakeupPipe = Pipe.open();

    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate

    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();

    (sink.sc).socket().setTcpNoDelay(true);

    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

    }

    这段代码中做了如下几个事情

    Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里;

    那么为什么需要一个管道,这个管道是怎么实现的?接下来看Pipe.open()做了什么

    Pipe.java

    ----

    public static Pipe open() throws IOException {

    return SelectorProvider.provider().openPipe();

    }

    同样,SelectorProvider.provider()也是获取操作系统相关的实现

    SelectorProvider.java

    ----

    public Pipe openPipe() throws IOException {

    return new PipeImpl(this);

    }

    这里还是看windows下的实现

    PipeImpl.java

    ----

    PipeImpl(final SelectorProvider sp) throws IOException {

    try {

    AccessController.doPrivileged(new Initializer(sp));

    } catch (PrivilegedActionException x) {

    throw (IOException)x.getCause();

    }

    }

    创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法

    PipeImpl.Initializer

    -----

    public Object run() throws IOException {

    ServerSocketChannel ssc = null;

    SocketChannel sc1 = null;

    SocketChannel sc2 = null;

    try {

    // loopback address

    InetAddress lb = InetAddress.getByName("127.0.0.1");

    assert(lb.isLoopbackAddress());

    // bind ServerSocketChannel to a port on the loopback address

    ssc = ServerSocketChannel.open();

    ssc.socket().bind(new InetSocketAddress(lb, 0));

    // Establish connection (assumes connections are eagerly

    // accepted)

    InetSocketAddress sa

    = new InetSocketAddress(lb, ssc.socket().getLocalPort());

    sc1 = SocketChannel.open(sa);

    ByteBuffer bb = ByteBuffer.allocate(8);

    long secret = rnd.nextLong();

    bb.putLong(secret).flip();

    sc1.write(bb);

    // Get a connection and verify it is legitimate

    for (;;) {

    sc2 = ssc.accept();

    bb.clear();

    sc2.read(bb);

    bb.rewind();

    if (bb.getLong() == secret)

    break;

    sc2.close();

    }

    // Create source and sink channels

    source = new SourceChannelImpl(sp, sc1);

    sink = new SinkChannelImpl(sp, sc2);

    } catch (IOException e) {

    try {

    if (sc1 != null)

    sc1.close();

    if (sc2 != null)

    sc2.close();

    } catch (IOException e2) { }

    IOException x = new IOException("Unable to establish"

    + " loopback connection");

    x.initCause(e);

    throw x;

    } finally {

    try {

    if (ssc != null)

    ssc.close();

    } catch (IOException e2) { }

    }

    return null;

    }

    这里即为上图中最下面那部分创建pipe的过程,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。

    source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))

    PollArrayWrapper.java

    ----

    private AllocatedNativeObject pollArray; // The fd array

    // Adds Windows wakeup socket at a given index.

    void addWakeupSocket(int fdVal, int index) {

    putDescriptor(index, fdVal);

    putEventOps(index, POLLIN);

    }

    // Access methods for fd structures

    void putDescriptor(int i, int fd) {

    pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);

    }

    void putEventOps(int i, int event) {

    pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);

    }

    这里将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述符wakeupSourceFd就会处于就绪状态

    Java代码  收藏代码

    AllocatedNativeObject.java

    ----

    class AllocatedNativeObject extends NativeObject

    AllocatedNativeObject(int size, boolean pageAligned) {

    super(size, pageAligned);

    }

    NativeObject.java

    ----

    protected NativeObject(int size, boolean pageAligned) {

    if (!pageAligned) {

    this.allocationAddress = unsafe.allocateMemory(size);

    this.address = this.allocationAddress;

    } else {

    int ps = pageSize();

    long a = unsafe.allocateMemory(size + ps);

    this.allocationAddress = a;

    this.address = a + ps - (a & (ps - 1));

    }

    }

    从以上可以看到pollArray是通过unsafe.allocateMemory(size + ps)分配的一块系统内存

    到这里完成了Selector.open(),主要完成建立Pipe,并把pipe的wakeupSourceFd放入pollArray中,这个pollArray是Selector的枢纽。这里是以Windows的实现来看,在windows下通过两个链接的socketChannel实现了Pipe,linux下则是直接使用系统的pipe。

    2. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    AbstractSelectableChannel.java --> register() --> SelectorImpl.java

    ----

    protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment)

    {

    if (!(ch instanceof SelChImpl))

    throw new IllegalSelectorException();

    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);

    k.attach(attachment);

    synchronized (publicKeys) {

    implRegister(k);

    }

    k.interestOps(ops);

    return k;

    }

    关键是implRegister(k);

    WindowsSelectorImpl.java

    ----

    protected void implRegister(SelectionKeyImpl ski) {

    growIfNeeded();

    channelArray[totalChannels] = ski;

    ski.setIndex(totalChannels);

    fdMap.put(ski);

    keys.add(ski);

    pollWrapper.addEntry(totalChannels, ski);

    totalChannels++;

    }

    PollArrayWrapper.java

    ----

    void addEntry(int index, SelectionKeyImpl ski) {

    putDescriptor(index, ski.channel.getFDVal());

    }

    这里把socketChannel的文件描述符放到pollArray中。

    3. selector.select();

    SelectorImpl.java

    ----

    public int select(long timeout) throws IOException

    {

    if (timeout < 0)

    throw new IllegalArgumentException("Negative timeout");

    return lockAndDoSelect((timeout == 0) ? -1 : timeout);

    }

    private int lockAndDoSelect(long timeout) throws IOException {

    synchronized (this) {if (!isOpen())              throw new ClosedSelectorException();          synchronized (publicKeys) {              synchronized (publicSelectedKeys) {                  return doSelect(timeout);              }          }      }  }

    其中的doSelector又回到我们的Windows实现:

    WindowsSelectorImpl.java

    ----

    protected int doSelect(long timeout) throws IOException {

    if (channelArray == null)

    throw new ClosedSelectorException();

    this.timeout = timeout; // set selector timeout

    processDeregisterQueue();

    if (interruptTriggered) {

    resetWakeupSocket();

    return 0;

    }

    // Calculate number of helper threads needed for poll. If necessary

    // threads are created here and start waiting on startLock

    adjustThreadsCount();

    finishLock.reset(); // reset finishLock

    // Wakeup helper threads, waiting on startLock, so they start polling.

    // Redundant threads will exit here after wakeup.

    startLock.startThreads();

    // do polling in the main thread. Main thread is responsible for

    // first MAX_SELECTABLE_FDS entries in pollArray.

    try {

    begin();

    try {

    subSelector.poll();

    } catch (IOException e) {

    finishLock.setException(e); // Save this exception

    }

    // Main thread is out of poll(). Wakeup others and wait for them

    if (threads.size() > 0)

    finishLock.waitForHelperThreads();

    } finally {

    end();

    }

    // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.

    finishLock.checkForException();

    processDeregisterQueue();

    int updated = updateSelectedKeys();

    // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.

    resetWakeupSocket();

    return updated;

    }

    private int poll() throws IOException{ // poll for the main thread

    return poll0(pollWrapper.pollArrayAddress,

    Math.min(totalChannels, MAX_SELECTABLE_FDS),

    readFds, writeFds, exceptFds, timeout);

    }

    private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

    其他的都是一些准备工作,关键是subSelector.poll(),最后调用了native的poll0,并把pollWrapper.pollArrayAddress作为参数传给poll0,那么poll0对pollArray做了什么:

    WindowsSelectorImpl.c

    ----

    Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,

    jlong pollAddress, jint numfds,

    jintArray returnReadFds, jintArray returnWriteFds,

    jintArray returnExceptFds, jlong timeout)

    {

    // 代码.... 此处省略一万字

    /* Call select */

    if ((result = select(0 , &readfds, &writefds, &exceptfds, tv)) == SOCKET_ERROR) {

    // 代码.... 此处省略一万字

    for (i = 0; i < numfds; i++) {

    // 代码.... 此处省略一万字

    }

    }

    }

    代码已经忘得差不多了,但这里可以看到实现思路是调用c的select方法,这里的select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符调用进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回;当轮询一遍后没有任何事件发生时,如果指定了超时时间,则select会睡眠到超时,睡眠结束后再进行一次轮询,并将临时结果写到用户空间,然后返回。

    这里的select就是轮询pollArray中的FD,看有没有事件发生,如果有事件发生收集所有发生事件的FD,退出阻塞。

    关于select系统调用参考了《select、poll、epoll的比较》这篇文章,同时看到nio的select在不同平台上的实现不同,在linux上通过epoll可以不用轮询,在第一次调用后,事件信息就会与对应的epoll描述符关联起来,待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。

    到这里已经比较清楚了,退出阻塞的方式有:regist在selector上的socketChannel处于就绪状态(放在pollArray中的socketChannel的FD就绪) 或者 第1节中放在pollArray中的wakeupSourceFd就绪。前者(socketChannel)就绪唤醒应证了文章开始的阻塞->事件驱动->唤醒的过程,后者(wakeupSourceFd)就是下面要看的主动wakeup。

    4. selector.wakeup()

    WindowsSelectorImpl.java

    ----

    public Selector wakeup() {

    synchronized (interruptLock) {

    if (!interruptTriggered) {

    setWakeupSocket();

    interruptTriggered = true;

    }

    }

    return this;

    }

    // Sets Windows wakeup socket to a signaled state.

    private void setWakeupSocket() {

    setWakeupSocket0(wakeupSinkFd);

    }

    private native void setWakeupSocket0(int wakeupSinkFd);

    native实现摘要:

    WindowsSelectorImpl.c

    ----

    Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,

    jint scoutFd)

    {

    /* Write one byte into the pipe */

    send(scoutFd, (char*)&POLLIN, 1, 0);

    }

    这里完成了向最开始建立的pipe的sink端写入了一个字节,source文件描述符就会处于就绪状态,poll方法会返回,从而导致select方法返回。(原来自己建立一个socket链着自己另外一个socket就是为了干这事)

    相关文章

      网友评论

          本文标题:NIO selector原理浅析

          本文链接:https://www.haomeiwen.com/subject/zjofqxtx.html