看到此文章可能有读者会感到疑惑,为什么NioServerSocketChannel
的实例化也要写一篇文章呢?在Netty
中NioServerSocketChannel
可谓是用的比较频繁的一个类了,它在初始化时进行了很多操作,很多人是不了解或是不甚了解的,当然,笔者也是。所以今天我们就深入了解一下它到底做了什么?
上面两个图中的
Channel
不是同一个类,上面的图是io.netty.channel.Channel
,Netty封装的Channel
类,下面的图是java.nio.channels.Channel
,jdk中java.nio
包下的Channel
类。
1. 贴NioServerSocketChannel
源码:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
注意:
为了减少文章的篇幅,这里我只是贴出部分源码,且删除了暂时用不到的代码和注释
我们以无参构造方法为例,在该方法中,执行了newSocket(DEFAULT_SELECTOR_PROVIDER)
,这个方法主要是返回一个ServerSocketChannelImpl
类的实例,然后将其转为ServerSocketChannel
类型,这里主要是java Nio
的应用,就不细讲了。
然后是调用构造方法NioServerSocketChannel(SelectorProvider provider)
,在该构造方法中首先是调用了父类AbstractNioMessageChannel
的构造方法,然后是执行config = new NioServerSocketChannelConfig(this, javaChannel().socket());
我们先就进入他的父类看看吧
2. 贴AbstractNioMessageChannel源码
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
在AbstractNioMessageChannel
的这个构造器中是直接调用其父类AbstractNioChannel
的构造方法,那我们就跟下去看看源码吧
3. 贴AbstractNioChannel源码
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
在这个构造方法中,我们终于看到了大段的代码了,不过第一步它又执行了父类的构造方法,这里我们先看看下面都做了什么,首先传下来的所以参数parent
为null
,ch
为ServerSocketChannel
的一个实例,readInterestOp
为SelectionKey.OP_ACCEPT
在AbstractNioChannel
构造方法中,(重点,后面要用到
)this.ch = ch;
保存传过来的ServerSocketChannel
的实例,然后是this.readInterestOp = readInterestOp;
,保存传过来的感兴趣的操作,即SelectionKey.OP_ACCEPT
。
然后是调用ch.configureBlocking(false);
将channel
设置为非阻塞的。这里我们就可以知道是用NioServerSocketChannel
,在实例化是它就会创建对应的channel
,并且设置为非阻塞模式。
好了,我们接下来看看super(parent)
做了什么事情,调用了AbstractChannel
的构造方法,那我看看它的源码吧
4. 贴AbstractChannel源码
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
@Override
public final ChannelId id() {
return id;
}
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
传过来的parent
为null
,所以我们直接看id = newId()
,这里设置了channel的id,所以只要NioServerSocketChannel
实例化,就会设置好id
、unsafe
和pipeline
的值,这里要注意一点就是id
的类型,并不是String
或是基本数据类型,而是 ChannelId
类型,newId()方法最终是创建了一个DefaultChannelId
类型的实例,这里就不深入了,DefaultChannelId
是接口ChannelId
的一个默认实现,源码不长,想深入了解channel
的id
生成规则的读者可以看看,DefaultChannelId
的全类名是io.netty.channel.DefaultChannelId
到这里,就已经看完了super(null, channel, SelectionKey.OP_ACCEPT)
所调用的代码所做的操作了,让我们回到这段代码config = new NioServerSocketChannelConfig(this, javaChannel().socket())
看看做了什么?首先看看javaChannel()
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
调用了父类AbstractNioChannel
的javaChannel()
,那我们看看的这个方法实现吧
protected SelectableChannel javaChannel() {
return ch;
}
直接返回ch
,ch
是什么呢?就是刚刚说重点的地方,该方法返回我们刚刚保存好的ServerSocketChannel
实例,这实例是用的子类ServerSocketChannelImpl
创建的,所以抽象方法socket()
调用的是ServerSocketChannelImpl
中的socket()
,那我们看看ServerSocketChannelImpl
的源码
5. ServerSocketChannelImpl的源码
private static NativeDispatcher nd;
private final FileDescriptor fd;
private int fdVal;
private final Object stateLock = new Object();
private int state = -1;
ServerSocket socket;
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
public ServerSocket socket() {
synchronized(this.stateLock) {
if (this.socket == null) {
this.socket = ServerSocketAdaptor.create(this);
}
return this.socket;
}
}
在构造方法中,可以看到并没有初始化socket
的值,所以socket
为null
,所以调用socket()返回的是ServerSocketAdaptor.create(this)
返回的结果
为了验证分析的正确性,我们做一下Debug
:
好结论是没有问题的,那我们向下走,看看
ServerSocketAdaptor.create(this)
源码
public static ServerSocket create(ServerSocketChannelImpl var0) {
try {
return new ServerSocketAdaptor(var0);
} catch (IOException var2) {
throw new Error(var2);
}
}
ServerSocketAdaptor源码
private ServerSocketAdaptor(ServerSocketChannelImpl var1) throws IOException {
this.ssc = var1;
}
这里只是将ServerSocketChannelImpl
实例进行保存,保存在ssc
字段中,并没有做其他的操作,到这里我们就获得了一个ServerSocketAdaptor
的实例,并且将其保存在ServerSocketChannelImpl
实例的socket
字段中,两个实例形成了相互引用。
从新回到new NioServerSocketChannelConfig(this, javaChannel().socket())
这段代码,现在我们直接进入NioServerSocketChannelConfig
中
6. NioServerSocketChannelConfig的源码
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);
}
@Override
protected void autoReadCleared() {
clearReadPending();
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
}
return super.setOption(option, value);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
return NioChannelOption.getOption(jdkChannel(), (NioChannelOption<T>) option);
}
return super.getOption(option);
}
@SuppressWarnings("unchecked")
@Override
public Map<ChannelOption<?>, Object> getOptions() {
if (PlatformDependent.javaVersion() >= 7) {
return getOptions(super.getOptions(), NioChannelOption.getOptions(jdkChannel()));
}
return super.getOptions();
}
private ServerSocketChannel jdkChannel() {
return ((NioServerSocketChannel) channel).javaChannel();
}
}
构造方法直接调用了父类DefaultServerSocketChannelConfig
的构造方法,那我们就深入吧
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
this.javaSocket = javaSocket;
}
DefaultServerSocketChannelConfig
还是调用了父类DefaultChannelConfig
的构造方法,并且保存传入的javaSocket
参数
7. DefaultChannelConfig的源码
protected final Channel channel;
private volatile RecvByteBufAllocator rcvBufAllocator;
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
setRecvByteBufAllocator(allocator, channel.metadata());
this.channel = channel;
}
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
} else if (allocator == null) {
throw new NullPointerException("allocator");
}
setRecvByteBufAllocator(allocator);
}
@Override
public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
rcvBufAllocator = checkNotNull(allocator, "allocator");
return this;
}
在构造方法中创建实例AdaptiveRecvByteBufAllocator
,这类其实是比较重要的,从类的名字上看,它的意思是适应接收缓存分配器(翻译可能不太准确),简单来说就是一个可以根据接收数据大小来判断开辟多大的空间存储数据。它是Netty
提供的一个可以估计接收数据量,自动扩容和减容的一个类,接下来我们看看它的源码
package io.netty.channel;
import java.util.ArrayList;
import java.util.List;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* The {@link RecvByteBufAllocator} that automatically increases and
* decreases the predicted buffer size on feed back.
* It gradually increases the expected number of readable bytes if the previous
* read fully filled the allocated buffer. It gradually decreases the expected
* number of readable bytes if the read operation was not able to fill a certain
* amount of the allocated buffer two times consecutively. Otherwise, it keeps
* returning the same prediction.
*/
/**
* {@link RecvByteBufAllocator}自动增加和
* 减少反馈时预测的缓冲区大小。
* 如果先前的read完全填充了分配的缓冲区,它会逐渐增加预期的可读字节数。如果读取操作不能连续两次填充分配的缓冲区的某个
* 量,则逐渐减少预期的*可读字节数。否则,它会保持
* 返回相同的预测。
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
/**
* @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
*/
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;
private int nextReceiveBufferSize;
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}
@Override
public void lastBytesRead(int bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
if (bytes == attemptedBytesRead()) {
record(bytes);
}
super.lastBytesRead(bytes);
}
@Override
public int guess() {
return nextReceiveBufferSize;
}
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
@Override
public void readComplete() {
record(totalBytesRead());
}
}
private final int minIndex;
private final int maxIndex;
private final int initial;
/**
* Creates a new predictor with the default parameters. With the default
* parameters, the expected buffer size starts from {@code 1024}, does not
* go down below {@code 64}, and does not go up above {@code 65536}.
*/
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
/**
* Creates a new predictor with the specified parameters.
*
* @param minimum the inclusive lower bound of the expected buffer size
* @param initial the initial buffer size when no feed back was received
* @param maximum the inclusive upper bound of the expected buffer size
*/
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
}
}
好了,具体细节还需要读者自行研读源码了,Netty
的源码还是值得一看的,但是也不要陷入源码,把握总体方向Reactor
模式。谢谢您的阅读,如有不足,欢迎来访
网友评论