研究Tomcat的网络部分,我们主要关注的点在于NioEndpoint这个组件的startInternal方法,这个方法中的主要代码如下
createExecutor(); //创建Worker线程池,最小工作线程数量为10,最大工作线程数量为200
poller = new Poller(); //创建poller线程
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
startAcceptorThread();
///////////////////startAcceptorThread//////////////////////////////
acceptor = new Acceptor<>(this); //创建Acceptor线程
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
////////////////////startAcceptorThread////////////////////////////
主要就是创建了两个线程,其中一个线程叫做Poller,另一个线程叫做Acceptor,创建完之后立马就对这两个线程进行启动。
下面我们开始对Acceptor线程进行探索,我们主要研究的是它的run方法。
它的run方法的主要代码如下
while (!stopCalled) {
//通过ServerSocketChannel.accept获取到连接上来的Socket
socket = endpoint.serverSocketAccept();
//对Socket进行设置相关参数
endpoint.setSocketOptions(socket)
}
我们可以看到主要的任务就是死循环去接收客户端的连接,并且设置一些Socket参数信息。设置参数究竟做了什么?主要逻辑如下
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
//如果有现有的Channel可以处理,那么先从现有的通道中pop一个出来
if (nioChannels != null) {
channel = nioChannels.pop();
}
//如果没有现有的Channel,那么需要创建一个NioChannel
if (channel == null) {
//准备一个读取Socket数据的处理器(Handler)
SocketBufferHandler bufhandler = new SocketBufferHandler.....
//如果开启了SSL,那么需要创建一个SecureNioChannel,如果没开启SSL,准备一个普通的Channel即可
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, selectorPool, this);
} else {
channel = new NioChannel(bufhandler);
}
}
//将NioChannel和NioEndpoint封装到NioSocketWrapper中
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
socket.configureBlocking(false);
poller.register(socketWrapper);
} catch (Throwable t) {
/////..............////////
}
}
其实主要就是将SocketChannel、NioChannel、SocketBufferHandler等信息包装到NioSocketWrapper对象中,然后将其注册到poller当中。
那么注册到poller的这个方法中究竟做了什么事呢?
public void register(final NioSocketWrapper socketWrapper) {
//设置SocketWrapper感兴趣的事件为READ事件
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent event = null;
if (eventCache != null) {
event = eventCache.pop();
}
if (event == null) {
event = new PollerEvent(socketWrapper, OP_REGISTER);
} else {
event.reset(socketWrapper, OP_REGISTER);
}
addEvent(event);
}
我们可以看到主要就是设置Socket的主要感兴趣的事件为读事件,以及创建一个PollerEvent对象,并执行addEvent方法把该PollerEvent加入到一个同步队列当中去。
接下来我们要研究的是Poller线程的run方法
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//遍历Acceptor给Poller放的PollEvent同步队列,进行消费,把SocketChannel注册到Selector上
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow(); //非阻塞,如果keyCount为0也会直接跳过
} else {
keyCount = selector.select(selectorTimeout); //阻塞一定时间
}
wakeupCounter.set(0);
}
//....................
} catch (Throwable x) {
//............
}
//获取SelectionKey的迭代器
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next(); //通过迭代器拿到获取SelectionKey
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); //Wrapper中封装了SelectorPool,Poller,NioChannels,EndPoint等
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper != null) { //获取NioSocketWrapper对象
processKey(sk, socketWrapper); //对事件进行处理
}
}
}
getStopLatch().countDown();
}
我们发现它主要做的工作就是,消费Acceptor接受的客户端请求的PollerEvent同步队列,并把该SocketChannel注册到Selector上。然后就是调用一下select方法拿到Selector上的活跃事件,然后遍历所有的SelectionKey(活跃事件)并进行处理,在processKey方法中主要就是对key进行处理。
我们也来感受一下processKey做了什么?
//...............................
if (sk.isReadable() || sk.isWritable()) { //判断SelectionKey是否可读?是否可写?
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps()); //避免重复读
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) { //如果SelectionKey是可读的
if (socketWrapper.readOperation != null) { //如果读操作不为null
if (!socketWrapper.readOperation.process()) {
closeSocket = true;
} //处理Socket,processSocket能够直接交给线程池去进行处理
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
closeSocket = true;
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
//................................
我们可以发现就是回到了我们传统的java NIO的套路,判断SelectionKey是否可读/可写,然后去执行processSocket拿到对应的处理器,并交给线程池对事件进行处理。
我们可以看到,这是Tomcat自己设计的IO模型,其实和Netty的设计很类似。
网友评论