前言
在前面[Tomcat学习笔记之启动分析(Connector)(七)]一文中,介绍了Connector容器的初始化与启动,这里以NioEndpoint为例,详细分析一下请求处理流程。
组件结构图
先来看一下整个Connector组件的结构图:
Connector组件的结构图
在之前的文章中,已经介绍了Acceptor、Poller、Worker 等核心组件的初始化过程。下面就这些核心组件一个一个来看。
备注:这个图是Tomcat7.0版本的,所以Acceptor还在NioEndpoint中,而在9.0版本,Acceptor已经单独提出去了。
NioEndpoint介绍
- 主要属性
/**
* 线程安全的非阻塞selector池
*/
private NioSelectorPool selectorPool = new NioSelectorPool();
/**
* Server socket "pointer".
*/
private volatile ServerSocketChannel serverSock = null;
private volatile CountDownLatch stopLatch = null;
/**
* PollerEvent缓存
*/
private SynchronizedStack<PollerEvent> eventCache;
/**
* NioChannel缓存,每个NioChannel持有一部分Bytebuffer(一般2个,SSL有4个),
*/
private SynchronizedStack<NioChannel> nioChannels;
/**
* poller线程的默认优先级
*/
private int pollerThreadPriority = Thread.NORM_PRIORITY;
/**
* Poller线程数,最多2个
*/
private int pollerThreadCount = Math.min(2, Runtime.getRuntime().availableProcessors());
/**
* selector.select()超时时间
*/
private long selectorTimeout = 1000;
/**
* The socket pollers.
*/
private Poller[] pollers = null;
其他方法介绍后面会说明。
Acceptor接受请求
Acceptor在7.0版本属于NioEndpoint内部类,9.0版本引入了Nio2Endpoit,所以Acceptor被提出来了。
public void run() {
int errorDelay = 0;
// 循环,知道收到shutdown命令
while (endpoint.isRunning()) {
// 如果endpoint暂停,循环
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//如果达到最大连接,等待
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
//Acceptor获取socket
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
//设置socket,注册到pollerevent队列中,如果失败,关闭scoket
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//1. socket设置为非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
//2. 设置Socket参数值(从server.xml的Connector节点上获取参数值)比如Socket发送、接收的缓存大小、心跳检测等
socketProperties.setProperties(sock);
//3. 从NioChannel的缓存栈中取出一个NioChannel,NioChannel是SocketChannel的一个的包装类
NioChannel channel = nioChannels.pop();
//4. 缓存队列中没有则新建一个NioChannel,并将SocketChannel关联到从缓存队列中获取的NioChannel上来
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//5. 注册到Poller中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
主要流程如下:
- 获取请求;
- 将socket设置为非阻塞的;
- 从NioChannel的缓存栈中获取NioChannel(socket和buf的包装类),没有则新建一个。
- 注册到Poller中。
事件注册
public void register(final NioChannel socket) {
//1. 设置关联的poller
socket.setPoller(this);
//2. NioChannel的包装类,增加了一些控制,读写超时时间之类的
NioSocketWrapper socketWrapper = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(socketWrapper);
socketWrapper.setPoller(this);
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
socketWrapper.setSecure(isSSLEnabled());
//3. 从pollerevent缓存栈中获取缓存,如果没有新建
PollerEvent r = eventCache.pop();
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if (r == null) {
r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
} else {
r.reset(socket, socketWrapper, OP_REGISTER);
}
//4. 添加至pollerevent缓存栈
addEvent(r);
}
主要流程:
- 设置关联的poller;
- NioChannel进一步包装;
- 从缓存中获取或者新建一个pollerevent;
- 放入pollerevent缓存栈中。
PollerEvent流程处理
上面说到NioChannel和NioSocketWrapper会被包装到PollerEvent,然后添加到PollerEvent队列中去,我们在这里看一下PollerEvent会做些什么:
public void run() {
//如果socket第一次注册到selector中,完成对socket读事件的注册
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
//寻找selector中的key,如果key为空,连接数减一,并且socketWrapper.close置位true;否则更新更新socket所感兴趣的事件
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
// We are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {
}
}
}
}
PollerEvent主要用来更新selector所感兴趣的事件。
总结
Connector初步处理请求的流程如下:
初步流程图
接下来看Poller的流程处理。
网友评论