简书 杭州_mina
《tomcat 8.x NioEndpoint核心组件浅析1》
1. Acceptor 浅析
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
// 一直循环直到接收到关闭命令
while (running) {
// paused 只有在unbind的时候会设置
while (paused && running) {
//变更Acceptor的状态
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
// Acceptor 设置成running
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
//连接数达到最大,暂停线程。
//这里用到的是connectionLimitLatch锁,可以理解为一个闭锁
//我理解connectionLimitLatch和coundownlatch类似
//补充一点这个是先增加计数器、如果超过最大连接数则减少计时器、然后线程暂停,你这个计时器就是当前连接数
//程序的下面我会简单的讲一下这个方法的实现
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
//调用serversocket的accept方法
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
// 出错了要减去连接数
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
// 把socket扔到poller中
if (!setSocketOptions(socket)) {
// 如果加入poller失败关闭连接
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
下面来看setSocketOptions方法中如何把SocketChannel加入到poller中
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false); //设置成非阻塞
//获取socket
Socket sock = socket.socket();
//配置socket信息
socketProperties.setProperties(sock);
//创建一个NioChannel 他封装了SocketChannel
NioChannel channel = nioChannels.pop();
if (channel == null) {
//如果为null 创建一个NioChannel 这里使用系统内存
//使用系统内存可以省去一步从系统内存拷贝到堆内存的动作、性能上会有很大的提升,nioChannels初始化默认为128个
//当socket 关闭的重新清理NioChannel而不是销毁这个对象可以达到对象复用的效果、因为申请系统内存的开销比申请堆内存的开销要大很多
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
//如果不为null设置SocketChannel
channel.setIOChannel(socket);
//将channle复位 以上就是重置系统内存把指针重新定位到buff的开始位置
channel.reset();
}
//丢入poller队列中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
最后一步register方法
public void register(final NioChannel socket) {
//设置poller 对象
socket.setPoller(this);
//设置一个附件待会把这个NioSocketWrapper注册到poller对象中的selector上去
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);//获取
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getSoTimeout());
ka.setWriteTimeout(getSoTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
//生成一个PollerEvent对象 这个对象继承了Runnable
//这个对象的主要作用就是把NioSocketWrapper注册到poller对象中的selector上去
//还有就是获取SelectionKey 该对象是用于跟踪这些被注册事件的句柄
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
//把pollerEvent放入到poller的队列中
addEvent(r);
}
2. LimitLatch 浅析
- 2.1 AbstractEndpoint 封装了LimitLatch一些方法
//默认最大连接数为10000
private int maxConnections = 10000;
//获取最大连接数
public int getMaxConnections() {
return this.maxConnections;
}
//初始化闭锁
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null;
if (connectionLimitLatch==null) {
//
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
//将会释放所有的线程
protected void releaseConnectionLatch() {
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.releaseAll();
connectionLimitLatch = null;
}
//增加计数,如果太大,那么等待
protected void countUpOrAwaitConnection() throws InterruptedException {
if (maxConnections==-1) return;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.countUpOrAwait();
}
//减少计数
protected long countDownConnection() {
if (maxConnections==-1) return -1;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();
if (result<0) {
getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
}
return result;
} else return -1
}
- 2.2 LimitLatch 源码浅析
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.threads;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Shared latch that allows the latch to be acquired a limited number of times
* after which all subsequent requests to acquire the latch will be placed in a
* FIFO queue until one of the shares is returned.
*/
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
//构建sync对象、主要用来同步、阻塞两个功能
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
//增加计数器
long newCount = count.incrementAndGet();
//如果计数器大于最大limit 则计数器减一 返回-1 否则返回 1
//这里的limit 其实就是maxConnections
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
//计数器减一
count.decrementAndGet();
return true;
}
}
private final Sync sync;
//计数器
private final AtomicLong count;
//最大连接数
private volatile long limit;
//是否全部释放
private volatile boolean released = false;
/**
* Instantiates a LimitLatch object with an initial limit.
*
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
/**
* Returns the current count for the latch
*
* @return the current count for latch
*/
//获取当前计数器
public long getCount() {
return count.get();
}
/**
* Obtain the current limit.
*
* @return the limit
*/
//获取最大连接数
public long getLimit() {
return limit;
}
/**
* Sets a new limit. If the limit is decreased there may be a period where
* more shares of the latch are acquired than the limit. In this case no
* more shares of the latch will be issued until sufficient shares have been
* returned to reduce the number of acquired shares of the latch to below
* the new limit. If the limit is increased, threads currently in the queue
* may not be issued one of the newly available shares until the next
* request is made for a latch.
*
* @param limit The new limit
*/
//设置最大连接数
public void setLimit(long limit) {
this.limit = limit;
}
/**
* Acquires a shared latch if one is available or waits for one if no shared
* latch is current available.
*
* @throws InterruptedException If the current thread is interrupted
*/
//增加计数器如果超过最大连接数、则等待并且计数器减一
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + getCount());
}
sync.acquireSharedInterruptibly(1);
}
/**
* Releases a shared latch, making it available for another thread to use.
*
* @return the previous counter value
*/
//减少计数器其实就是减少连接数
public long countDown() {
sync.releaseShared(0);
long result = getCount();
if (log.isDebugEnabled()) {
log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
}
return result;
}
/**
* Releases all waiting threads and causes the {@link #limit} to be ignored
* until {@link #reset()} is called.
*
* @return <code>true</code> if release was done
*/
//释放所有线程
public boolean releaseAll() {
released = true;
return sync.releaseShared(0);
}
/**
* Resets the latch and initializes the shared acquisition counter to zero.
*
* @see #releaseAll()
*/
//重置计数器
public void reset() {
this.count.set(0);
released = false;
}
/**
* Returns <code>true</code> if there is at least one thread waiting to
* acquire the shared lock, otherwise returns <code>false</code>.
*
* @return <code>true</code> if threads are waiting
*/
//是否有等待线程
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Provide access to the list of threads waiting to acquire this limited
* shared latch.
*
* @return a collection of threads
*/
//获取所有等待线程
public Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
}
3.总结Acceptor流程
虽然看上去代码有点复杂,但是实际上就是一句话概括。获取socket、并且对socket进行封装,扔到Poller的队列中。
网友评论