一、 前言
阿里巴巴长期招聘Java研发工程师p6,p7,p8等上不封顶级别,有意向的可以发简历给我,注明想去的部门和工作地点:1064454834@qq.com_
二、 Connector
研究过tomcat的童鞋应该都知道tomcat的容器构造:
image.pngConnector是一个桥梁它把Server和Engine链接了起来,Connector的作用是接受客户端端的请求,然后把请求委托为engine容器去处理。
Connector内部使用endpoint进行处理,根据处理方式的不同分为NioEndpoint,JIoEndpoint,AprEndpoint。
三、 NioEndpoint
3.1 主流程启动时序
image.png下面首先分析下套接字绑定代码:
public void bind() throws Exception {
//创建服务套接字
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.socket().bind(addr,getBacklog());
serverSock.configureBlocking(true); //打开阻塞模式
serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
// 初始化接受线程个数和轮询链接套接字状态线程个数
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
stopLatch = new CountDownLatch(pollerThreadCount);
// Initialize SSL if needed
if (isSSLEnabled()) {
SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);
sslContext = sslUtil.createSSLContext();
sslContext.init(wrap(sslUtil.getKeyManagers()),
sslUtil.getTrustManagers(), null);
SSLSessionContext sessionContext =
sslContext.getServerSessionContext();
if (sessionContext != null) {
sslUtil.configureSessionContext(sessionContext);
}
// Determine which cipher suites and protocols to enable
enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
}
if (oomParachute>0) reclaimParachute(true);
selectorPool.open();
}
启动startInternal代码:
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// 创建处理线程池
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
//启动poller 线程,nio需要
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//启动链接接受线程
startAcceptorThreads();
}
}
需要注意的是startAcceptorThreads里面可以创建多个线程去调用 serverSock.accept()来接受完成了三次握手的链接的socket,但是为什么这里会有多个线程那?平时我们不都是用一个线程?因为这个方法是阻塞的,那么多个线程去接受链接有好处?
答案是肯定的,因为服务套接字调用listen方法后,就可以接受链接了,它会吧接受到了链接放入到TCP缓存队列,当调用accept时候是从这个缓存队列里面获取一个已经完成三次握手的套接字处理,而这个缓存大小是受acceptcount影响的。
3.2 Acceptor线程
accept线程作用是接受客户端发来的请求并放入到事件队列。
image.png看下代码:
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// 一直循环直到接收到shutdown命令
while (running) {
...
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//如果达到max connections个请求则等待
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// 从TCP缓存获取一个完成三次握手的套接字,没有则阻塞
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
...
}
// Successful accept, reset the error delay
errorDelay = 0;
....
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
....
}
state = AcceptorState.ENDED;
}
}
3.3 Poll线程
poll线程作用是从事件队列里面获取事件把链接套接字加入selector,并且监听socket事件进行处理。
image.pngpublic void run() {
while (true) {
try {
...
if (close) {
...
} else {
hasEvents = events();
}
try {
...
} catch ( NullPointerException x ) {...
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 遍历所有注册的channel对感兴趣的事件处理
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
} catch (OutOfMemoryError oom) {
...
}
}//while
synchronized (this) {
this.notifyAll();
}
stopLatch.countDown();
}
//如配置线程池则请求交给线程池处理。
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
...
}
return true;
}
3.4 总结
NIO 是可以多个线程来接受客户端的链接,这个和bio是一样的,不一样在于是NIO会把接受到的链接放入事件队列,然后多个poll线程会从事件队列获取事件,并且NIO可以每个poll线程去监听多个链接socket的事件,然后交给线程池去处理,也就说一个poll 线程可以监听多个socket的读写事件,然后交给线程池去处理,这相比于bio节省了很多线程资源。
四、JioEndpoint
4.1 主启动流程
image.png首先看下bind函数有啥不同
public void bind() throws Exception {
// Initialize thread count defaults for acceptor
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
// Initialize maxConnections
if (getMaxConnections() == 0) {
// User hasn't set a value - use the default
setMaxConnections(getMaxThreadsExecutor(true));
}
//创建serversocket工厂
if (serverSocketFactory == null) {
if (isSSLEnabled()) {
serverSocketFactory =
handler.getSslImplementation().getServerSocketFactory(this);
} else {
serverSocketFactory = new DefaultServerSocketFactory(this);
}
}
//创建serversocket
if (serverSocket == null) {
try {
if (getAddress() == null) {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog());
} else {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog(), getAddress());
}
} catch (BindException orig) {
String msg;
if (getAddress() == null)
msg = orig.getMessage() + " <null>:" + getPort();
else
msg = orig.getMessage() + " " +
getAddress().toString() + ":" + getPort();
BindException be = new BindException(msg);
be.initCause(orig);
throw be;
}
}
}
内部实际是调用new ServerSocket 创建seversocket.
4.2 Acceptor线程
image.png代码如下:
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
...
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
Socket socket = null;
try {
//接受链接
socket = serverSocketFactory.acceptSocket(serverSocket);
} catch (IOException ioe) {
。。。
}
//配置套接字选项
if (running && !paused && setSocketOptions(socket)) {
// 处理套接字
if (!processSocket(socket)) {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} else {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} catch (IOException x) {
。。。
}
}
state = AcceptorState.ENDED;
}
}
把设置好套接字选项后的套接字放入线程池处理
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) ...
}
return true;
}
4.3 总结
可知BIO也可以使用多线程去接受链接,然后把接受的socket放入线程池进行处理,是每个线程处理一个链接sockcet,这是典型的处理方式。
欢迎关注微信公众号:技术原始积累 获取更多技术干货_
image.png
网友评论