引子
在上一篇分析NioEventLoop那篇文章中分析NioEventLoop.run()
时候提到当方法processSelectedKey()
处理事件的时候,比如读事件,最终调用了unsafe.read();
本文就来分析一下unsafe对象的read方法的具体实现。
检测新的连接
源码中的Unsafe是一个接口,在调试时可知其实现类为NioMessageUnsafe类,其read方法源码如下:
private final List<Object> readBuf = new ArrayList<Object>();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
@Override
public void read() {
......
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
}
......
doReadMessages()
方法在当前类是一个抽闲方法,实现在NioServerSocketChannel类中,如下:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
可以看出当一个新的read事件过来的时候,服务端accept后创建SocketChannel对象,并封装为NioSocketChannel对象,放到一个链表中。
此外,上述read方法是一个循环,条件是allocHandle.continueReading()
值为true,实际运行中,这个方法实现如下:
public boolean continueReading() {
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
totalMessages < maxMessagePerRead &&
totalBytesRead < Integer.MAX_VALUE;
}
这个方法的意思是当配置配了自动读,并且一次进去总共的连接数不能超过maxMessagePerRead(默认值为16)个连接。如果下一次执行循环的时候没有连接的话,那么while循环就结束了。
分析到这里,检测新连接的意思就是:获取当前读取操作有几个连接进来,封装成NioSocketChannel对象并放在一个readBuf链表里面。
NioSocketChannel的创建
在讲述检测连接的地方提到将SocketChannel封装为NioSocketChannel对象并进行创建,这里接着分析这个Netty提供的NioSocketChannel的实例化过程:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
首先调用了父类的构造函数,实现为:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
其中传入readInterestOp = SelectionKey.OP_READ
即感兴趣的事件为读事件。在构造函数中主要是设置了SocketChannel的阻塞模式为非阻塞,这是NIO编程中实现选择器进行选择的关键设置。
我们接着看进一步的父类构造函数调用:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
所以总结来说NioSocketChannel里面有几个属性:
- parent(保存了NioServerSocketChannel对象)。
- id
- unsafe
- pipeline
关于unsafe和pipeline,后续将会对其进行分析理解。我们接着回过头来看NioSocketChannel构造函数的第二个语句:config = new NioSocketChannelConfig(this, socket.socket());
,即创建一个配置类NioSocketChannelConfig,这个配置类最终构造函数实现:
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
this.javaSocket = javaSocket;
// Enable TCP_NODELAY by default if possible.
if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
try {
setTcpNoDelay(true);
} catch (Exception e) {
// Ignore.
}
}
}
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
try {
javaSocket.setTcpNoDelay(tcpNoDelay);
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
可以看出配置类主要初始化的时候就配置了一个东西:调用Socket对象的setTcpNoDelay(true)
方法。
关于java Socket中这个配置具体起到什么作用,不清楚,看源码备注:disable/enable Nagle's algorithm
是用来配置是否启动一个Nagle算法加的,这里先知道干了这么一件事就行了。
网友评论