tomcat可以绑定多个端口,也就是可以建立多个connector,每个connector的配置及绑定,过程如下:
tomcat启动之前,建立connector,并指定参数,可以添加多个connector,每个connector对应一个端口。
@Bean
public ServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
tomcat.addAdditionalTomcatConnectors(createConnector());
return tomcat;
}
private Connector createConnector() {
Connector connector = new Connector("org.apache.coyote.http11.Http11NioProtocol");
Http11NioProtocol protocol = (Http11NioProtocol) connector.getProtocolHandler();
try {
protocol.setKeepAliveTimeout(30000);
protocol.setMaxKeepAliveRequests(100);
protocol.setMaxConnections(1000);
protocol.setAcceptCount(100);
protocol.setMaxThreads(1000);
return connector;
}
catch (IOException ex) {
}
}
绑定监听端口,并指定acceptQueue大小
// ----------------------------------------------- Public Lifecycle Methods
/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
...
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true); //mimic APR behavior
...
// Initialize thread count defaults for acceptor, poller
// Initialize SSL if needed
}
t初始化最大链接数,创建线程池,创建多个Poller线程,创建多个Acceptor
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
...
// Create worker collection
createExecutor();
//最大链接数
initializeConnectionLatch();
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
...
startAcceptorThreads();
}
}
1 acceptCount
-serverSock.socket().bind(addr,getAcceptCount());
注:socket 请求完成队列数量,如果socket链接数量超过maxConnections,Acceptor线程进入等待状态,不会接accept新的链接,后续完成建链的请求会占用acceptQueue,占满后会导致后续请求建链失败。
2 maxConnections
链接接入:
countUpOrAwaitConnection() 检查是否超过最大链接数,如果超过则进入等待状态,直到有链接释放,不会再接入新的链接;
setSocketOptions(socket) 将Poller注册到socket进行实践监听,所有socket的请求都由Poller进行处理。
// --------------------------------------------------- Acceptor Inner Class
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
// Loop until we receive a shutdown command
while (running) {
...
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} ...
// Successful accept, reset the error delay
...
}
}
3 maxThreads
Executor线程池,最大线程数,tomcat线程池如下,默认使用TaskQueue,线程池的队列为LinkedBlockingQueue,LinkedBlockingQueue可以无限增加,也就是其最大数量为最大链接数(当然不包括每个socket有多个链接的场景):
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
/**
* As task queue specifically designed to run with a thread pool executor. The
* task queue is optimised to properly utilize threads within a thread pool
* executor. If you use a normal queue, the executor will spawn threads when
* there are idle threads and you wont be able to force items onto the queue
* itself.
*/
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
// ---------------------------------------------- Request processing methods
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
...
SocketProcessorBase<S> sc = processorCache.pop();
...
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
...
}
4 Keep-Alive
意味着一旦建立socket就可以不断的发送请求到服务端,而不需要重新建链(TCP三次握手),使用keep-alive的好处有如下几点:
https://tools.ietf.org/html/rfc2616
Persistent HTTP connections have a number of advantages:
- By opening and closing fewer TCP connections, CPU time is saved
in routers and hosts (clients, servers, proxies, gateways,
tunnels, or caches), and memory used for TCP protocol control
blocks can be saved in hosts.
- HTTP requests and responses can be pipelined on a connection.
Pipelining allows a client to make multiple requests without
waiting for each response, allowing a single TCP connection to
be used much more efficiently, with much lower elapsed time.
- Network congestion is reduced by reducing the number of packets
caused by TCP opens, and by allowing TCP sufficient time to
determine the congestion state of the network.
- Latency on subsequent requests is reduced since there is no time
spent in TCP's connection opening handshake.
- HTTP can evolve more gracefully, since errors can be reported
without the penalty of closing the TCP connection. Clients using
future versions of HTTP might optimistically try a new feature,
but if communicating with an older server, retry with old
semantics after an error is reported.
socket的请求由Poller处理。
/**
* Poller class.
*/
public class Poller implements Runnable {
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
...
timeout(keyCount,hasEvents);
}//while
}
由每个connector的请求处理过程可以看出,消息流应该是:
1 NioEndpoint 初始化 serverSock
2 Acceptors 并发的调 serverSocket.accept()获取链接
3 将socket设置为阻塞状态,获取NioChannel,并将Poller(通过轮寻获得)注册到NioChannel,唤醒selector;
/**
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
...
getPoller0().register(channel);
...
}
/**
* Return an available poller in true round robin fashion.
* @return The next poller in sequence
*/
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
/**
* Registers a newly created socket with the poller.
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
...
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
//添加监听事件,唤醒selector
private void addEvent(PollerEvent event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
5 请求由Poller转给内部线程池
还需要看下selector是怎么玩的?
image.png
网友评论