美文网首页
Tomcat NIO2 网络模型原理分

Tomcat NIO2 网络模型原理分

作者: 绝尘驹 | 来源:发表于2020-10-05 18:21 被阅读0次

tomcat NIO2是基于java jdk nio2实现的,想要弄明白tomcat的实现,我们必须要理解jdk nio2的实现原理

异步IO

异步IO对相对同步IO来说的,我们平时用的到无论是阻塞io还是非阻塞io,比如select,pool,epoll,读写io等都是同步io,应用在知道读事件后,是我们的用户线程真正去读io数据,即从内核态的缓冲区copy到用户态的,这个copy动作是用户态线程做的,而且需要等这个copy动作完成才能继续往下执行。

而异步IO是这个读io,即copy的动作是内核帮你完成的,用户线程接收到的事件是copy完成事件,即数据已经帮你copy好了在你指定的缓冲buffer里,你直接拿来用就可以了。

linux 的异步io只支持对直接io,即direct_io,就是读写都不经过操作系统的高速缓存,这类一般都是数据库使用的,比如mysql的数据页缓存是用的direct_io,节省了内核的缓存。

如果是非direct_io 要想在linux实现异步,目前都是通过用户态的线程池来模拟的,比如jdk提供的异步io就是通过用户态的线程池和epoll来实现的,需要注意的是具体的读写操作还是由用户态的线程来完成。

JDK NIO2

jdk nio2 linux下是通过epoll来模拟异步io的,对应的实现是LinuxAsynchronousChannelProvider

Tomcat NIO2

NIO2 是相对http1 nio说的,nio2是基于jdk nio2版本实现的网络io模型

tomcat nio2 是tomcat7之后支持的,但不是默认的网络模型,通过connector配置
protocol="org.apache.coyote.http11.Http11Nio2Protocol" 来指定使用nio2网络模型

tomcat nio2 的网络初始化主要是NIO2Endpoint2,初始化如下:

public void bind() throws Exception {

        // Create worker collection
        // Tomcat 建议nio2的 protocol不用tomcat提供的executor,需要独立创建一个。
        if (getExecutor() == null) {
            createExecutor();
        }
        //jdk nio2 需要的线程池,返回的是一个EPollPort的实现
        if (getExecutor() instanceof ExecutorService) {
            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
        }
        // AsynchronousChannelGroup needs exclusive access to its executor service
        if (!internalExecutor) {
            log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
        }

        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
        socketProperties.setProperties(serverSock);
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.bind(addr, getAcceptCount());

        // Initialize SSL if needed
        initialiseSsl();
    }

上面的代码核心是启动一个epoll的线程,来监听epoll并处理事件,如果是io事件,则执行完io操作后,提交一个任务到线程池异步化,当前线程继续处理其他的事件。

这里我们可以看出,默认是一个线程来处理所有的io事件的,包含accept链接,io读写,优秀的程序员一看这里存在瓶颈,也不符合reactor模型的思想哈,正是这个原因,jdk提供来一个参数:

sun.nio.ch.internalThreadPoolSize

来指定这个线程数,默认是1,多个线程时,只会由一个线程阻塞在epollwait上,其他的线程阻塞在内部的queue上。

说完了上面的epoll io 事件的原理,下面就是创建server socket
上面AsynchronousServerSocketChannel.open创建一个异步的UnixAsynchronousServerSocketChannelImpl,并配置为非阻塞的,核心代码如下:

UnixAsynchronousServerSocketChannelImpl(Port port)
    throws IOException
{
    super(port);

    try {
        //配置为非阻塞,不像nio1
        IOUtil.configureBlocking(fd, false);
    } catch (IOException x) {
        nd.close(fd);  // prevent leak
        throw x;
    }
    this.port = port;
    this.fdVal = IOUtil.fdVal(fd);

    // add mapping from file descriptor to this channel
    port.register(fdVal, this);
}

NIO2 Acceptor

nio2 的acceptor 不是一个独立的线程,是通过上面创建的线程池来运行的,只是一个特效的task,代码如下:

@Override
protected void startAcceptorThread() {
    // Instead of starting a real acceptor thread, this will instead call
    // an asynchronous accept operation
    if (acceptor == null) {
        acceptor = new Nio2Acceptor(this);
        acceptor.setThreadName(getName() + "-Acceptor");
    }
    acceptor.state = AcceptorState.RUNNING;
    //直接执行acceptor任务,就是管他有没有链接过来,先accept下再说,万一有链接来了呢,不就省了一次注册事件的系统调用不。
    getExecutor().execute(acceptor);
}

上面说的特殊是指Acceptor实现了异步IO事件的CompletionHandler,来接收accept是否完成的事件,我们先看下Nio2Acceptor的run()方法实现:

    @Override
    public void run() {
        // The initial accept will be called in a separate utility thread
        if (!isPaused()) {
            //if we have reached max connections, wait
            try {
                //这里是还是计算链接的个数是否达到配置的阀值,是否要阻塞
                countUpOrAwaitConnection();
            } catch (InterruptedException e) {
                // Ignore
            }
            if (!isPaused()) {
                
                // Note: as a special behavior, the completion handler for accept is
                // always called in a separate thread.
                //这里的意思是accept操作的线程和回调的线程是两个独立的线程,无论是通过一次线程池主动accept成功回调还是通过epoll线程执行accept的回调,都是异步的。
                serverSock.accept(null, this);
            } else {
                state = AcceptorState.PAUSED;
            }
        } else {
            state = AcceptorState.PAUSED;
        }
    }

熟悉tomcat nio1的同学,肯定就知道重写了原理acceptor的run方法,这里只有accept了,没有接收完新建立的链接去读数据的代码,因为是异步io的模式,接受新来的链接也是通过异步模拟的,会通过线程池异步回调这个Nio2Acceptor的completed方法,如果接受成功的话,这个后面分析,这里先分析accept 链接的过程。

serverSock.accept(null, this)内部的核心代码在UnixAsynchronousServerSocketChannelImpl的implAccept方法,如下:

Future<AsynchronousSocketChannel> implAccept(Object att,
    CompletionHandler<AsynchronousSocketChannel,Object> handler)
{
    // complete immediately if channel is closed
    if (!isOpen()) {
        Throwable e = new ClosedChannelException();
        if (handler == null) {
            return CompletedFuture.withFailure(e);
        } else {
            Invoker.invoke(this, handler, att, null, e);
            return null;
        }
    }
    if (localAddress == null)
        throw new NotYetBoundException();

    // cancel was invoked with pending accept so connection may have been
    // dropped.
    if (isAcceptKilled())
        throw new RuntimeException("Accept not allowed due cancellation");

    // check and set flag to prevent concurrent accepting
    // 防止并行accept
    if (!accepting.compareAndSet(false, true))
        throw new AcceptPendingException();

    // attempt accept
    FileDescriptor newfd = new FileDescriptor();
    InetSocketAddress[] isaa = new InetSocketAddress[1];
    Throwable exc = null;
    try {
        begin();
        //accept链接
        int n = accept(this.fd, newfd, isaa);
        if (n == IOStatus.UNAVAILABLE) {
            // 这个时候不一定有新链接过来,所有需要处理没有链接的情况
            // need calling context when there is security manager as
            // permission check may be done in a different thread without
            // any application call frames on the stack
            PendingFuture<AsynchronousSocketChannel,Object> result = null;
            synchronized (updateLock) {
                if (handler == null) {
                    this.acceptHandler = null;
                    result = new PendingFuture<AsynchronousSocketChannel,Object>(this);
                    this.acceptFuture = result;
                } else {
                    this.acceptHandler = handler;
                    this.acceptAttachment = att;
                }
                this.acceptAcc = (System.getSecurityManager() == null) ?
                    null : AccessController.getContext();
                this.acceptPending = true;
            }

            // register for connections
            port.startPoll(fdVal, Net.POLLIN);
            return result;
        }
    } catch (Throwable x) {
        // accept failed
        if (x instanceof ClosedChannelException)
            x = new AsynchronousCloseException();
        exc = x;
    } finally {
        end();
    }

    AsynchronousSocketChannel child = null;
    if (exc == null) {
        // connection accepted immediately
        try {
            child = finishAccept(newfd, isaa[0], null);
        } catch (Throwable x) {
            exc = x;
        }
    }

    // re-enable accepting before invoking handler
    enableAccept();

    if (handler == null) {
        return CompletedFuture.withResult(child, exc);
    } else {
        //异步回调handler的事件,handler就是我们的acceptor
        Invoker.invokeIndirectly(this, handler, att, child, exc);
        return null;
    }
}

上面这么大一段代码,主要干了三件事情:

  • 检查是否存在并行accept,即多个线程accept,只能同时一个线程accept。
  • 没有链接时,需要在epoll上重新注册读事件,本次accept任务就算结束了,待链接来时会重新去accept。
  • 有链接时,完成链接的建立,同时异步回调accept的complete方法,通过接受链接完成,进入后续的处理。

建链完成 Complete

上面完成了新链接的accept操作后,就通过异步任务的方式即当前线程创建一个task提交到线程池,回调Nio2Acceptor对应的complete方法,complete方法的代码如下:

public void completed(AsynchronousSocketChannel socket,
            Void attachment) {
        // Successful accept, reset the error delay
        errorDelay = 0;
        // Continue processing the socket on the current thread
        // Configure the socket
        if (isRunning() && !isPaused()) {
            //没有链接限制时,当前回调的线程继续去accept,如果还有链接,则同样是通过另个线程来执行回调处理。
            if (getMaxConnections() == -1) {
                serverSock.accept(null, this);
            } else {
                // Accept again on a new thread since countUpOrAwaitConnection may block
                // 有限制的hua
                getExecutor().execute(this);
            }
            //为读链接上的数据做准备,并开始解析协议内容。 
            if (!setSocketOptions(socket)) {
                closeSocket(socket);
            }
        } else {
            if (isRunning()) {
                state = AcceptorState.PAUSED;
            }
            destroySocket(socket);
        }
    }

completed 方法首先判断是否有链接限制,如果没有,则直接继续accept,如果有链接,则会做完accept后,继续处理当前链接,因为accept完成后,同样提交一个任务异步处理接受的链接。

如果是🈶️限制的,默认是有限制的,则是提交一个任务,因为如果链接数达到限制了会阻塞住执行任务的线程,我们不能阻塞住处理当前链接的线程,否则就出现客户端发起了请求等不到结果。

还有一点需要注意的是accept事件的回调一定是异步的,这个和读写IO事件不一样,下面可以看到。

setSocketOptions 方法里面会为创建的channel创建一个Nio2SocketWrapper,该wrapper有一个内部readCompletionHandler和writeCompletionHandler,对应一个链接上读和写的异步io操作的回调处理者,setSocketOptions方法如下:

protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
    Nio2SocketWrapper socketWrapper = null;
    try {
        // Allocate channel and wrapper
        Nio2Channel channel = null;
        if (nioChannels != null) {
            channel = nioChannels.pop();
        }
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNio2Channel(bufhandler, this);
            } else {
                channel = new Nio2Channel(bufhandler);
            }
        }
        // 异步读写wrapper
        Nio2SocketWrapper newWrapper = new Nio2SocketWrapper(channel, this);
        channel.reset(socket, newWrapper);
        connections.put(socket, newWrapper);
        socketWrapper = newWrapper;

        // Set socket properties
        socketProperties.setProperties(socket);

        socketWrapper.setReadTimeout(getConnectionTimeout());
        socketWrapper.setWriteTimeout(getConnectionTimeout());
        socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
        // Continue processing on the same thread as the acceptor is async
        // 后面对http协议解析,并处理请求和同步io是一样的,false 参数表示用当前线程继续执行,不异步,和nio 一不一样。
        return processSocket(socketWrapper, SocketEvent.OPEN_READ, false);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.socketOptionsError"), t);
        if (socketWrapper == null) {
            destroySocket(socket);
        }
    }
    // Tell to close the socket if needed
    return false;
}

上面processSocket会开始解析http协议,解析前需要把整个包的内容读出来,读完后会回调这个readCompletionHandler的complete方法,complete的代码如下:

       @Override
       public void completed(Integer nBytes, ByteBuffer attachment) {
                if (log.isDebugEnabled()) {
                    log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
                }
                readNotify = false;
                synchronized (readCompletionHandler) {
                    if (nBytes.intValue() < 0) {
                        failed(new EOFException(), attachment);
                    } else {
                        //没有读到数据时,readInterest为true,而且不是内部调,则需要通知process去 开始处理这个请求
                        if (readInterest && !Nio2Endpoint.isInline()) {
                            readNotify = true;
                        } else {
                            // Release here since there will be no
                            // notify/dispatch to do the release.
                            readPending.release();
                        }
                        readInterest = false;
                    }
                }
                if (readNotify) {
                   //读到数据,继续执行解析http协议以及后面的处理。
                  getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
                }
        }

这是正常的流程,不知道大家有没有注意到,nio2是链接accept后,直接去读,并不是先注册一个读事件,等内核发现可读时再回调通知去真正的读,这是nio2模型的一个优化,即少一次epoll注册事件的系统调用,所有就存在有可能,这次是读不到数据的,读不到的时,而且有超时时间的话,会提交一个读超时的定时任务到超时线程池,同时向epoll注册读事件,等超时时间到后,检查是否读到,没有就直接超时,回调对应的failed方法。

这里还有一个需要注意的地方,就是我们从接受一个新的链接完成时是异步的回调Nio2Acceptor的complete方法的,然后一直由这个线程执行这个链接上的读操作,会涉及到io操作。因为要把链接上的数据从内核态读到用户态,这个过程也是当前线程执行的,读完以后回调呢,是异步的还是继续由当前线程执行做直接回调,答案是直接回调,还是有当前线程执行,原因是jdk nio2 的异步回调时会给执行回调任务的线程绑定一个group,就是serversocket,代码如下:

private Runnable bindToGroup(final Runnable task) {
    final AsynchronousChannelGroupImpl thisGroup = this;
    return new Runnable() {
        public void run() {
            //该当前线程的上下文即threadlocal帮定group
            Invoker.bindToGroup(thisGroup);
            task.run();
        }
    };
}

Invoker.bindToGroup(thisGroup)的代码如下:

static void bindToGroup(AsynchronousChannelGroupImpl group) {
    myGroupAndInvokeCount.set(new GroupAndInvokeCount(group));
}

上面绑定这个group是干啥用的呢,就是线程池的任务在执行io操作完成时,回调对应的handler的complete方法时,是有当前线程执行还是通过异步回调就是根据这个判断的,部分核心代码如下:

Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
    boolean invokeDirect = false;
    boolean attemptRead = false;
    if (!disableSynchronousRead) {
        if (handler == null) {
            attemptRead = true;
        } else {
            //myGroupAndInvokeCount 是当前线程绑定的group的
            myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
            invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
            // okay to attempt read with user thread pool
            attemptRead = invokeDirect || !port.isFixedThreadPool();
        }
    }

disableSynchronousRead 默认是false,这个系统属性的作用等下再讲,下面判断是否直接调用的判断是:

static boolean mayInvokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
                               AsynchronousChannelGroupImpl group)
{
    if ((myGroupAndInvokeCount != null) &&
        (myGroupAndInvokeCount.group() == group) &&
        (myGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
    {
        return true;
    }
    return false;
}

这里就是根据前面绑定的myGroupAndInvokeCount来判断的,除了绑定了线程上下文group,一个监听端口对应一个全局的group,还有一个条件是回调的次数限制即maxHandlerInvokeCount,默认是16次,通过sun.nio.ch.maxCompletionHandlersOnStack 指定。意思就是防止回调次数过多,线程栈溢出。

好了上面提到的系统属性disableSynchronousRead,这个是啥意思呢,就是如果为true的,那线程池的线程都不执行具体的io读写操作,都由epoll线程来执行,这样的就是读要向epoll注册一次事件,由epoll线程来回调。

写了这么多,写还没有提及,不过和读差不多,有时间单独起一偏,文章不能写太长,下面上一章图,对nio2 的线程模型做个总结:

tomcat-nio2-thread-mod.png

上了图后,再提一点,分析了NIO2,那到底是用默认的NIO线程模型还是用NIO2异步的线程模型,建议用NIO2,NIO2的优势主要为:

  • 注册读事件调用次数减少,系统调用开销很大需要用户态到内核态切换。
  • 直接基于epoll实现,可以配置多个epoll thread,就能支持同时大量链接的请求,默认的nio模型是一个poll线程。

相关文章

网友评论

      本文标题:Tomcat NIO2 网络模型原理分

      本文链接:https://www.haomeiwen.com/subject/gpuxjktx.html