本文主要分析了tomcat response数据时的一个流程梳理,主要解释了
- 写数据时涉及的buffer 以及copy
- 写数据时是如何阻塞的
tomcat 响应主要是response,一个response是在接受请求的时候创建的
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
//创建response
response = connector.createResponse();
response.setCoyoteResponse(res);
Response
response 的数据需要一个存储的地方,是通过CoyoteWriter或者CoyoteOutputStream来代理的,即这两个Coyote类的 outputBuffer,buffer的大小为8k
public Response() {
this(OutputBuffer.DEFAULT_BUFFER_SIZE);
}
DEFAULT_BUFFER_SIZE 8k
OutputBuffer
outputbuffer由两个buffer组成,分别是ByteBuffer和CharBuffer,
public OutputBuffer(int size) {
//size 默认为8k
bb = ByteBuffer.allocate(size);
clear(bb);
cb = CharBuffer.allocate(size);
clear(cb);
}
写的方式
response.getWriter() 返回的是CoyoteWriter
if (writer == null) {
writer = new CoyoteWriter(outputBuffer);
}
response.getOutputStream() 返回的CoyoteOutputStream
@Override
public ServletOutputStream getOutputStream()
throws IOException {
usingOutputStream = true;
if (outputStream == null) {
outputStream = new CoyoteOutputStream(outputBuffer);
}
return outputStream;
}
CoyoteWriter 和 CoyoteOutputStream 的数据都是写入到OutputBuffer,通过outputbuffer写出去。
CoyoteWriter 的写核心是write(String s, int off, int len)方法,写到CharBuffer中的,代码如下:
public void write(String s, int off, int len) throws IOException {
if (suspended) {
return;
}
if (s == null) {
throw new NullPointerException(sm.getString("outputBuffer.writeNull"));
}
int sOff = off;
int sEnd = off + len;
while (sOff < sEnd) {
//cb是CharBuffer,把字符串s写入到charBuffer中。
int n = transfer(s, sOff, sEnd - sOff, cb);
sOff += n;
if (isFull(cb)) {
//写满了,刷新charBuffer,把char buffer 写到byte buffer
flushCharBuffer();
}
}
charsWritten += len;
}
flushCharBuffer 是先把CharBuffer的数据转换为byte写到ByteBuffer,如果ByteBuffer写满了,则把 ByteBuffer通过response的outputBuffer写到socket的writeBuffer,把ByteBuffer写完后,继续把CharBuffer的数据转到ByteBuffer。
Response的outputBuffer 是在创建http11Processor的时候创建的,和上面提到的outputBuffer注意区别下:代码如下:
outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
response.setOutputBuffer(outputBuffer);
flushCharBuffer 最终是调用outputBuffer的doWrite方法,即Http11OutputBuffer的write方法。
@Override
public int doWrite(ByteBuffer chunk) throws IOException {
if (!response.isCommitted()) {
// Send the connector a request for commit. The connector should
// then validate the headers, send them (using sendHeaders) and
// set the filters accordingly.
// 准备响应头,写到Http11OutputBuffer的headerBuffer
response.action(ActionCode.COMMIT, null);
}
if (lastActiveFilter == -1) {
return outputStreamOutputBuffer.doWrite(chunk);
} else {
return activeFilters[lastActiveFilter].doWrite(chunk);
}
}
准备响应头
响应内容到客户端,先要把header 准备好,header包含响应行,和响应头,同时指定body的格式是否为chunk,并设置commit的标记,同时从headerBuffer copy到socket buffer,在最后flush的时候写到网络上
内容的格式
tomcat 根据响应头是否有content-length来决定是用chunk的方式组织内容还是正常的格式,如果是chunk形式的,响应头会有:
Transfer-Encoding: chunked
而没有content-length,客户端也根据这个来读响应的body内容
代码如下,位置为Http11Processor的prepareResponse方法:
long contentLength = response.getContentLengthLong();
boolean connectionClosePresent = false;
if (http11 && response.getTrailerFields() != null) {
// If trailer fields are set, always use chunking
outputBuffer.addActiveFilter(outputFilters[Constants.CHUNKED_FILTER]);
contentDelimitation = true;
headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
} else if (contentLength != -1) {
//如果指定了content-length,指定写数据的filter为IDENTITY_FILTER
headers.setValue("Content-Length").setLong(contentLength);
outputBuffer.addActiveFilter(outputFilters[Constants.IDENTITY_FILTER]);
contentDelimitation = true;
} else {
// If the response code supports an entity body and we're on
// HTTP 1.1 then we chunk unless we have a Connection: close header
connectionClosePresent = isConnectionClose(headers);
if (http11 && entityBody && !connectionClosePresent) {
//如果是1.1,而且没有主动close,则用chunk格式发送body,对应的fiter为chunk filter
outputBuffer.addActiveFilter(outputFilters[Constants.CHUNKED_FILTER]);
contentDelimitation = true;
headers.addValue(Constants.TRANSFERENCODING).setString(Constants.CHUNKED);
} else {
outputBuffer.addActiveFilter(outputFilters[Constants.IDENTITY_FILTER]);
}
}
//如果需要压缩,则增加压缩的output filter
if (useCompression) {
outputBuffer.addActiveFilter(outputFilters[Constants.GZIP_FILTER]);
}
生成响应行和响应头
// Build the response header
try {
//生成响应行的内容
outputBuffer.sendStatus();
//生成响应头的内容
int size = headers.size();
for (int i = 0; i < size; i++) {
outputBuffer.sendHeader(headers.getName(i), headers.getValue(i));
}
//生成响应头的结束标志:换行和回车符
outputBuffer.endHeaders();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// If something goes wrong, reset the header buffer so the error
// response can be written instead.
outputBuffer.resetHeaderBuffer();
throw t;
}
//把headerBuffer的数据copy到socket的writerBuffer,最终flush的时候发送到网络上
outputBuffer.commit();
outputBuffer就是Http11OutputBuffer,这里就是把响应行和header写到Http11OutputBuffer的headerBuffer,最后通过commit 把 headerBuffer的数据copy到该链接的socket的writeBuffer,commit的代码如下:
protected void commit() throws IOException {
response.setCommitted(true);
if (headerBuffer.position() > 0) {
// Sending the response header buffer
headerBuffer.flip();
try {
SocketWrapperBase<?> socketWrapper = this.socketWrapper;
if (socketWrapper != null) {
//通过socket开始写,写body也是通过这个方法写
//默认是阻塞写,如果是aio,则是异步写
socketWrapper.write(isBlocking(), headerBuffer);
} else {
throw new CloseNowException(sm.getString("iob.failedwrite"));
}
} finally {
headerBuffer.position(0).limit(headerBuffer.capacity());
}
}
}
阻塞写是调用SocketWrapperBase的writeBlocking方法,下面的写body时也是会调用到该方法:
protected void writeBlocking(ByteBuffer from) throws IOException {
if (socketBufferHandler.isWriteBufferEmpty()) {
//header部分数据时,socket的write buffer是空的,是直接写
// Socket write buffer is empty. Write the provided buffer directly
// to the network.
// TODO Shouldn't smaller writes be buffered anyway?
//其实这里并不一定直接发,还会检查发送的数据大小和socket write buffer大小比较,
//如果小于还是先写到socket write buffer,这样减少系统调用,保证一次写系统调用尽量写足够多的数据,如果大于则先把from的write buffer的大小的数据写到os层,剩余的写到write buffer
writeBlockingDirect(from);
} else {
//写body部分时,是找这里,因为socket的writeBuffer有header的数据
// Socket write buffer has some data.
socketBufferHandler.configureWriteBufferForWrite();
// Put as much data as possible into the write buffer
transfer(from, socketBufferHandler.getWriteBuffer());
// If the buffer is now full, write it to the network and then write
// the remaining data directly to the network.
if (!socketBufferHandler.isWriteBufferWritable()) {
//如果socket的writeBuffer写满了,则把数据写到os层
doWrite(true);
writeBlockingDirect(from);
}
}
}
writeBlockingDirect 最终的核心代码如下:
NioSocketWrapper att = (NioSocketWrapper) key.attachment();
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while (!timedout && buf.hasRemaining()) {
if (keycount > 0) { //only write if we were registered for a write
//开始写,这里buf是socket的write buffer,默认是堆buffer
//这里需要做一次copy,先到堆外,再到写缓冲区
int cnt = socket.write(buf); //write the data
if (cnt == -1) {
throw new EOFException();
}
written += cnt;
if (cnt > 0) {
//如果cnt大于0,说明能写,不管写了多少,都继续写,直到不能写,
//返回0,则到下面的逻辑,说明不能写了,则阻塞,直到可以写
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
}
try {
if (att.getWriteLatch() == null || att.getWriteLatch().getCount() == 0) {
att.startWriteLatch(1);
}
//注册一个写事件到NioBlockingSelector上
poller.add(att, SelectionKey.OP_WRITE, reference);
att.awaitWriteLatch(AbstractEndpoint.toTimeout(writeTimeout), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
// Ignore
}
if (att.getWriteLatch() != null && att.getWriteLatch().getCount() > 0) {
//we got interrupted, but we haven't received notification from the poller.
//是虚假唤醒,因为latch的count还大于0,重新设置keycount为0,循环后继续阻塞
keycount = 0;
} else {
//latch countdown has happened
//可以写,
keycount = 1;
att.resetWriteLatch();
}
//检查是否超时,写超时时间默认为connection timeout
if (writeTimeout > 0 && (keycount == 0)) {
timedout = (System.currentTimeMillis() - time) >= writeTimeout;
}
}
if (timedout) {
throw new SocketTimeoutException();
}
通过上面的代码可以看出,tomcat阻塞写和阻塞读是一样的原理,通过
NioSocketWrapper的writeLatch来阻塞写,阻塞的是tomcat的catalina的worker线程,tomcat有一个专门的NioBlockingSelector ,来专门处理读和写的阻塞。
写了这么多,body的写还没有涉及,以及body对应的output filter对body的处理,还没有写,打算单独再开一篇
总结
写了这么多,是梳理了tomcat响应内容时的一个流程,以写阻塞的实现原理,更重要的是tomcat响应内容时涉及的buffer 很多,buffer和buffer直接就是copy,下面通过一张图说明下整个buffer的关系
tomcat-response-buffer.png通过上面的图,可以清新的看出涉及到的buffer,以及buffer之间的copy关系,一共涉及3次copy,如果tomcat的write buffer是堆外buffer,则是2次
网友评论