美文网首页
Tomcat 写操作原理分析--上篇

Tomcat 写操作原理分析--上篇

作者: 绝尘驹 | 来源:发表于2020-02-03 16:31 被阅读0次

本文主要分析了tomcat response数据时的一个流程梳理,主要解释了

  • 写数据时涉及的buffer 以及copy
  • 写数据时是如何阻塞的

tomcat 响应主要是response,一个response是在接受请求的时候创建的

@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {

    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        //创建response
        response = connector.createResponse();
        response.setCoyoteResponse(res);

Response

response 的数据需要一个存储的地方,是通过CoyoteWriter或者CoyoteOutputStream来代理的,即这两个Coyote类的 outputBuffer,buffer的大小为8k

public Response() {
    this(OutputBuffer.DEFAULT_BUFFER_SIZE);
}

DEFAULT_BUFFER_SIZE 8k

OutputBuffer

outputbuffer由两个buffer组成,分别是ByteBuffer和CharBuffer,

public OutputBuffer(int size) {
    //size 默认为8k
    bb = ByteBuffer.allocate(size);
    clear(bb);
    cb = CharBuffer.allocate(size);
    clear(cb);
}

写的方式

response.getWriter() 返回的是CoyoteWriter

   if (writer == null) {
        writer = new CoyoteWriter(outputBuffer);
   }

response.getOutputStream() 返回的CoyoteOutputStream

 @Override
 public ServletOutputStream getOutputStream()
    throws IOException {

    usingOutputStream = true;
    if (outputStream == null) {
        outputStream = new CoyoteOutputStream(outputBuffer);
    }
    return outputStream;
}

CoyoteWriter 和 CoyoteOutputStream 的数据都是写入到OutputBuffer,通过outputbuffer写出去。

CoyoteWriter 的写核心是write(String s, int off, int len)方法,写到CharBuffer中的,代码如下:

public void write(String s, int off, int len) throws IOException {
    if (suspended) {
        return;
    }
   if (s == null) {
        throw new NullPointerException(sm.getString("outputBuffer.writeNull"));
    }

    int sOff = off;
    int sEnd = off + len;
    while (sOff < sEnd) {
        //cb是CharBuffer,把字符串s写入到charBuffer中。 
        int n = transfer(s, sOff, sEnd - sOff, cb);
        sOff += n;
        if (isFull(cb)) {
            //写满了,刷新charBuffer,把char buffer 写到byte buffer
            flushCharBuffer();
        }
    }

    charsWritten += len;
}

flushCharBuffer 是先把CharBuffer的数据转换为byte写到ByteBuffer,如果ByteBuffer写满了,则把 ByteBuffer通过response的outputBuffer写到socket的writeBuffer,把ByteBuffer写完后,继续把CharBuffer的数据转到ByteBuffer。

Response的outputBuffer 是在创建http11Processor的时候创建的,和上面提到的outputBuffer注意区别下:代码如下:

 outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
response.setOutputBuffer(outputBuffer);

flushCharBuffer 最终是调用outputBuffer的doWrite方法,即Http11OutputBuffer的write方法。

@Override
public int doWrite(ByteBuffer chunk) throws IOException {

    if (!response.isCommitted()) {
        // Send the connector a request for commit. The connector should
        // then validate the headers, send them (using sendHeaders) and
        // set the filters accordingly.
        // 准备响应头,写到Http11OutputBuffer的headerBuffer
        response.action(ActionCode.COMMIT, null);
    }

    if (lastActiveFilter == -1) {
        return outputStreamOutputBuffer.doWrite(chunk);
    } else {
        return activeFilters[lastActiveFilter].doWrite(chunk);
    }
}

准备响应头

响应内容到客户端,先要把header 准备好,header包含响应行,和响应头,同时指定body的格式是否为chunk,并设置commit的标记,同时从headerBuffer copy到socket buffer,在最后flush的时候写到网络上

内容的格式

tomcat 根据响应头是否有content-length来决定是用chunk的方式组织内容还是正常的格式,如果是chunk形式的,响应头会有:

      Transfer-Encoding: chunked

而没有content-length,客户端也根据这个来读响应的body内容
代码如下,位置为Http11Processor的prepareResponse方法:

  long contentLength = response.getContentLengthLong();
    boolean connectionClosePresent = false;
    if (http11 && response.getTrailerFields() != null) {
        // If trailer fields are set, always use chunking
        outputBuffer.addActiveFilter(outputFilters[Constants.CHUNKED_FILTER]);
        contentDelimitation = true;
        headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
    } else if (contentLength != -1) {
        //如果指定了content-length,指定写数据的filter为IDENTITY_FILTER
        headers.setValue("Content-Length").setLong(contentLength);
        outputBuffer.addActiveFilter(outputFilters[Constants.IDENTITY_FILTER]);
        contentDelimitation = true;
    } else {
        // If the response code supports an entity body and we're on
        // HTTP 1.1 then we chunk unless we have a Connection: close header
        connectionClosePresent = isConnectionClose(headers);
        if (http11 && entityBody && !connectionClosePresent) {
            //如果是1.1,而且没有主动close,则用chunk格式发送body,对应的fiter为chunk filter
            outputBuffer.addActiveFilter(outputFilters[Constants.CHUNKED_FILTER]);
            contentDelimitation = true;
            headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
        } else {
            outputBuffer.addActiveFilter(outputFilters[Constants.IDENTITY_FILTER]);
        }
    }
    //如果需要压缩,则增加压缩的output filter
    if (useCompression) {
         outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
    }

生成响应行和响应头

    // Build the response header
    try {
         //生成响应行的内容
        outputBuffer.sendStatus();
        //生成响应头的内容
        int size = headers.size();
        for (int i = 0; i < size; i++) {
            outputBuffer.sendHeader(headers.getName(i), headers.getValue(i));
        }
        //生成响应头的结束标志:换行和回车符
        outputBuffer.endHeaders();
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // If something goes wrong, reset the header buffer so the error
        // response can be written instead.
        outputBuffer.resetHeaderBuffer();
        throw t;
    }
    //把headerBuffer的数据copy到socket的writerBuffer,最终flush的时候发送到网络上
    outputBuffer.commit();

outputBuffer就是Http11OutputBuffer,这里就是把响应行和header写到Http11OutputBuffer的headerBuffer,最后通过commit 把 headerBuffer的数据copy到该链接的socket的writeBuffer,commit的代码如下:

protected void commit() throws IOException {
    response.setCommitted(true);

    if (headerBuffer.position() > 0) {
        // Sending the response header buffer
        headerBuffer.flip();
        try {
            SocketWrapperBase<?> socketWrapper = this.socketWrapper;
            if (socketWrapper != null) {
                //通过socket开始写,写body也是通过这个方法写
                //默认是阻塞写,如果是aio,则是异步写
                socketWrapper.write(isBlocking(), headerBuffer);
            } else {
                throw new CloseNowException(sm.getString("iob.failedwrite"));
            }
        } finally {
            headerBuffer.position(0).limit(headerBuffer.capacity());
        }
    }
}

阻塞写是调用SocketWrapperBase的writeBlocking方法,下面的写body时也是会调用到该方法:

 protected void writeBlocking(ByteBuffer from) throws IOException {
 
    if (socketBufferHandler.isWriteBufferEmpty()) {
        //header部分数据时,socket的write buffer是空的,是直接写
        // Socket write buffer is empty. Write the provided buffer directly
        // to the network.
        // TODO Shouldn't smaller writes be buffered anyway?
        //其实这里并不一定直接发,还会检查发送的数据大小和socket write buffer大小比较,
        //如果小于还是先写到socket write buffer,这样减少系统调用,保证一次写系统调用尽量写足够多的数据,如果大于则先把from的write buffer的大小的数据写到os层,剩余的写到write buffer
       writeBlockingDirect(from);
    } else {
        //写body部分时,是找这里,因为socket的writeBuffer有header的数据
        // Socket write buffer has some data.
        socketBufferHandler.configureWriteBufferForWrite();
        // Put as much data as possible into the write buffer
        transfer(from, socketBufferHandler.getWriteBuffer());
        // If the buffer is now full, write it to the network and then write
        // the remaining data directly to the network.
        if (!socketBufferHandler.isWriteBufferWritable()) {
            //如果socket的writeBuffer写满了,则把数据写到os层
            doWrite(true);
            writeBlockingDirect(from);
        }
    }
}

writeBlockingDirect 最终的核心代码如下:

    NioSocketWrapper att = (NioSocketWrapper) key.attachment();
    int written = 0;
    boolean timedout = false;
    int keycount = 1; //assume we can write
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
        while (!timedout && buf.hasRemaining()) {
            if (keycount > 0) { //only write if we were registered for a write
                //开始写,这里buf是socket的write buffer,默认是堆buffer
                //这里需要做一次copy,先到堆外,再到写缓冲区
                int cnt = socket.write(buf); //write the data
                if (cnt == -1) {
                    throw new EOFException();
                }
                written += cnt;
                if (cnt > 0) {
                    //如果cnt大于0,说明能写,不管写了多少,都继续写,直到不能写,
                    //返回0,则到下面的逻辑,说明不能写了,则阻塞,直到可以写
                    time = System.currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
            }
            try {
                if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) {
                    att.startWriteLatch(1);
                }
                //注册一个写事件到NioBlockingSelector上
                poller.add(att, SelectionKey.OP_WRITE, reference);
                att.awaitWriteLatch(AbstractEndpoint.toTimeout(writeTimeout), TimeUnit.MILLISECONDS);
            } catch (InterruptedException ignore) {
                // Ignore
            }
            if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) {
                //we got interrupted, but we haven't received notification from the poller.
                //是虚假唤醒,因为latch的count还大于0,重新设置keycount为0,循环后继续阻塞
                keycount = 0;
            } else {
                //latch countdown has happened
                //可以写,
                keycount = 1;
                att.resetWriteLatch();
            }
            //检查是否超时,写超时时间默认为connection timeout
            if (writeTimeout > 0 && (keycount == 0)) {
                timedout = (System.currentTimeMillis() - time) >= writeTimeout;
            }
        }

        if (timedout) {
            throw new SocketTimeoutException();
        }

通过上面的代码可以看出,tomcat阻塞写和阻塞读是一样的原理,通过
NioSocketWrapper的writeLatch来阻塞写,阻塞的是tomcat的catalina的worker线程,tomcat有一个专门的NioBlockingSelector ,来专门处理读和写的阻塞。

写了这么多,body的写还没有涉及,以及body对应的output filter对body的处理,还没有写,打算单独再开一篇

总结

写了这么多,是梳理了tomcat响应内容时的一个流程,以写阻塞的实现原理,更重要的是tomcat响应内容时涉及的buffer 很多,buffer和buffer直接就是copy,下面通过一张图说明下整个buffer的关系

tomcat-response-buffer.png

通过上面的图,可以清新的看出涉及到的buffer,以及buffer之间的copy关系,一共涉及3次copy,如果tomcat的write buffer是堆外buffer,则是2次

相关文章

网友评论

      本文标题:Tomcat 写操作原理分析--上篇

      本文链接:https://www.haomeiwen.com/subject/srkkwctx.html