前言
前面两篇已经介绍了Acceptor、Poller和SocketProcessor的处理流程,下面我们来具体看一下后续流程,如何一步一步的将scoket转换成Request的对象。
在这之前我们先来大致回顾下Acceptor和Poller的大致流程:
Acceptor和Poller的大致流程
Handler流程处理
上面了解到SocketProcessor将主要工作交给了Handler来处理,我们先来看下Handler的接口:
public static interface Handler<S> {
/**
* Different types of socket states to react upon.
*/
public enum SocketState {
// TODO Add a new state to the AsyncStateMachine and remove
// ASYNC_END (if possible)
OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED
}
/**
* 使用给定的当前状态处理提供的套接字
*
* @param socket The socket to process
* @param status The current socket status
*
* @return The state of the socket after processing
*/
public SocketState process(SocketWrapperBase<S> socket,
SocketEvent status);
/**
* Obtain the GlobalRequestProcessor associated with the handler.
*
* @return the GlobalRequestProcessor
*/
public Object getGlobal();
/**
* Obtain the currently open sockets.
*
* @return The sockets for which the handler is tracking a currently
* open connection
*/
public Set<S> getOpenSockets();
/**
* Release any resources associated with the given SocketWrapper.
*
* @param socketWrapper The socketWrapper to release resources for
*/
public void release(SocketWrapperBase<S> socketWrapper);
/**
* Inform the handler that the endpoint has stopped accepting any new
* connections. Typically, the endpoint will be stopped shortly
* afterwards but it is possible that the endpoint will be resumed so
* the handler should not assume that a stop will follow.
*/
public void pause();
/**
* Recycle resources associated with the handler.
*/
public void recycle();
}
ConnectionHandler是Handler唯一的实现类(这里只看#process方法),是AbstractProtocol的内部类,之前在介绍Connector启动中介绍过:
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
//省略部分代码。。。
try {
//省略部分代码。。。
//1. 创建Processor,这里创建的是Http11Processor,初始化了org.apache.coyotes.Request对象
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
// Associate the processor with the connection
connections.put(socket, processor);
SocketState state = SocketState.CLOSED;
do {
//2. 交给processor来处理请求
state = processor.process(wrapper, status);
//省略部分代码。。。
} while ( state == SocketState.UPGRADING);
//省略部分代码。。。
return state;
} catch(java.net.SocketException e) {
} catch (java.io.IOException e) {
} catch (ProtocolException e) {
}catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
} finally {
ContainerThreadMarker.clear();
}
connections.remove(socket);
//3. 释放连接
release(processor);
return SocketState.CLOSED;
}
主要流程如下:
- 创建Http11Processor,并初始化org.apache.coyotes.Request对象
- 交给processor来解析http请求
- 释放连接
Http11Processor处理流程
process()方法在其父类AbstractProcessorLight中:
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
//调用子类的service方法具体处理
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], State after async post processing: [" + state + "]");
}
}
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
这里重点是调用了子类即#Http11Processor.service()方法来处理请求:
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
//1.RequestInfo保存Request和Response对象的结构。它还包含有关请求处理的统计信息,并提供有关正在处理的请求的管理信息
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
//2. 设置I/O
setSocketWrapper(socketWrapper);
//3. 读缓冲区初始化(部分参数已在创建Http11Processor时初始化好)
inputBuffer.init(socketWrapper);
//4. 写缓冲区初始化(部分参数已在创建Http11Processor时初始化好)
outputBuffer.init(socketWrapper);
// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !protocol.isPaused()) {
//5. 解析请求头
try {
if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
protocol.getKeepAliveTimeout())) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
if (protocol.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!protocol.getDisableUploadTimeout()) {
socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
}
}
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString("http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString("http11processor.fallToDebug");
//$FALL-THROUGH$
case INFO:
log.info(message, t);
break;
case DEBUG:
log.debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
// 省略部分代码。。。
//6. 设置请求过滤器
if (getErrorState().isIoAllowed()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}
int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
//7. 在适配器中处理请求
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
} catch (HeadersTooLargeException e) {
log.error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// If this is an async request then the request ends when it has
// been completed. The AsyncContext is responsible for calling
// endRequest() in that case.
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
}
if (!protocol.getDisableUploadTimeout()) {
int connectionTimeout = protocol.getConnectionTimeout();
if(connectionTimeout > 0) {
socketWrapper.setReadTimeout(connectionTimeout);
} else {
socketWrapper.setReadTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
sendfileState = processSendfile(socketWrapper);
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || protocol.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileState == SendfileState.PENDING) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
主要流程如下:
- 解析请求头;
- 设置请求过滤器;
- 调用适配器adapter来处理接下来的流程。
CoyoteAdapter处理流程
上面最主要的流程就是#getAdapter().service(request, response)方法,我们在Connector初始化中了解到这里的adapter是CoyoteAdapter,所以我们直接看#CoyoteAdapter.service()方法:
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
//这里request为空,需要重新创建
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// 解析并设置Catalina和配置特定的请求参数
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// 调用Container
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
//省略部分代码。。。。
} catch (IOException e) {
// Ignore
} finally {
//省略部分代码。。。。
}
}
这里完成了org.apache.coyote.Request到org.apache.catalina.connector.Request(实现了HttpServletRequest接口)的转换,并且看到了我们熟悉的代码#connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response),这个在管道一篇我们有介绍,这样子就进入了Container容器中,后面的逻辑在前面已经提过,这里不做介绍。
总结
总体流程图如下:
总体流程图
网友评论