美文网首页
Netty中的NioServerSocketChannel类实例

Netty中的NioServerSocketChannel类实例

作者: small瓜瓜 | 来源:发表于2019-06-13 00:10 被阅读0次

    看到此文章可能有读者会感到疑惑,为什么NioServerSocketChannel的实例化也要写一篇文章呢?在NettyNioServerSocketChannel可谓是用的比较频繁的一个类了,它在初始化时进行了很多操作,很多人是不了解或是不甚了解的,当然,笔者也是。所以今天我们就深入了解一下它到底做了什么?

    NioServerSocketChannel类继承结构 ServerSocketChannelImpl类继承结构

    上面两个图中的Channel不是同一个类,上面的图是io.netty.channel.Channel,Netty封装的Channel类,下面的图是java.nio.channels.Channel,jdk中java.nio包下的Channel类。

    1. 贴NioServerSocketChannel源码:
        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
        private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
        public NioServerSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    

    注意:为了减少文章的篇幅,这里我只是贴出部分源码,且删除了暂时用不到的代码和注释
    我们以无参构造方法为例,在该方法中,执行了newSocket(DEFAULT_SELECTOR_PROVIDER),这个方法主要是返回一个ServerSocketChannelImpl类的实例,然后将其转为ServerSocketChannel类型,这里主要是java Nio的应用,就不细讲了。
    然后是调用构造方法NioServerSocketChannel(SelectorProvider provider),在该构造方法中首先是调用了父类AbstractNioMessageChannel的构造方法,然后是执行config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    我们先就进入他的父类看看吧

    2. 贴AbstractNioMessageChannel源码
        protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent, ch, readInterestOp);
        }
    

    AbstractNioMessageChannel的这个构造器中是直接调用其父类AbstractNioChannel的构造方法,那我们就跟下去看看源码吧

    3. 贴AbstractNioChannel源码
            protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    

    在这个构造方法中,我们终于看到了大段的代码了,不过第一步它又执行了父类的构造方法,这里我们先看看下面都做了什么,首先传下来的所以参数parentnullchServerSocketChannel的一个实例,readInterestOpSelectionKey.OP_ACCEPT
    AbstractNioChannel构造方法中,(重点,后面要用到this.ch = ch;保存传过来的ServerSocketChannel的实例,然后是this.readInterestOp = readInterestOp;,保存传过来的感兴趣的操作,即SelectionKey.OP_ACCEPT
    然后是调用ch.configureBlocking(false);channel设置为非阻塞的。这里我们就可以知道是用NioServerSocketChannel,在实例化是它就会创建对应的channel,并且设置为非阻塞模式。

    好了,我们接下来看看super(parent)做了什么事情,调用了AbstractChannel的构造方法,那我看看它的源码吧

    4. 贴AbstractChannel源码
        private final ChannelId id;
        private final Unsafe unsafe;
        private final DefaultChannelPipeline pipeline;
    
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
        @Override
        public final ChannelId id() {
            return id;
        }
        protected ChannelId newId() {
            return DefaultChannelId.newInstance();
        }
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }
        @Override
        public ChannelPipeline pipeline() {
            return pipeline;
        }
    

    传过来的parentnull,所以我们直接看id = newId(),这里设置了channel的id,所以只要NioServerSocketChannel实例化,就会设置好idunsafepipeline的值,这里要注意一点就是id的类型,并不是String或是基本数据类型,而是 ChannelId类型,newId()方法最终是创建了一个DefaultChannelId类型的实例,这里就不深入了,DefaultChannelId是接口ChannelId的一个默认实现,源码不长,想深入了解channelid生成规则的读者可以看看,DefaultChannelId的全类名是io.netty.channel.DefaultChannelId

    到这里,就已经看完了super(null, channel, SelectionKey.OP_ACCEPT)所调用的代码所做的操作了,让我们回到这段代码config = new NioServerSocketChannelConfig(this, javaChannel().socket())看看做了什么?首先看看javaChannel()

        @Override
        protected ServerSocketChannel javaChannel() {
            return (ServerSocketChannel) super.javaChannel();
        }
    

    调用了父类AbstractNioChanneljavaChannel(),那我们看看的这个方法实现吧

        protected SelectableChannel javaChannel() {
            return ch;
        }
    

    直接返回chch是什么呢?就是刚刚说重点的地方,该方法返回我们刚刚保存好的ServerSocketChannel实例,这实例是用的子类ServerSocketChannelImpl创建的,所以抽象方法socket()调用的是ServerSocketChannelImpl中的socket(),那我们看看ServerSocketChannelImpl的源码

    5. ServerSocketChannelImpl的源码

    private static NativeDispatcher nd;
        private final FileDescriptor fd;
        private int fdVal;
        private final Object stateLock = new Object();
        private int state = -1;
        ServerSocket socket;
    
        ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
            super(var1);
            this.fd = Net.serverSocket(true);
            this.fdVal = IOUtil.fdVal(this.fd);
            this.state = 0;
        }
         public ServerSocket socket() {
            synchronized(this.stateLock) {
                if (this.socket == null) {
                    this.socket = ServerSocketAdaptor.create(this);
                }
                return this.socket;
            }
        }
    

    在构造方法中,可以看到并没有初始化socket的值,所以socketnull,所以调用socket()返回的是ServerSocketAdaptor.create(this)返回的结果
    为了验证分析的正确性,我们做一下Debug

    Debug结果
    好结论是没有问题的,那我们向下走,看看ServerSocketAdaptor.create(this)源码
        public static ServerSocket create(ServerSocketChannelImpl var0) {
            try {
                return new ServerSocketAdaptor(var0);
            } catch (IOException var2) {
                throw new Error(var2);
            }
        }
    

    ServerSocketAdaptor源码

        private ServerSocketAdaptor(ServerSocketChannelImpl var1) throws IOException {
            this.ssc = var1;
        }
    

    这里只是将ServerSocketChannelImpl实例进行保存,保存在ssc字段中,并没有做其他的操作,到这里我们就获得了一个ServerSocketAdaptor的实例,并且将其保存在ServerSocketChannelImpl实例的socket字段中,两个实例形成了相互引用。

    从新回到new NioServerSocketChannelConfig(this, javaChannel().socket())这段代码,现在我们直接进入NioServerSocketChannelConfig

    NioServerSocketChannelConfig类的继承结构

    6. NioServerSocketChannelConfig的源码

    private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
            private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
                super(channel, javaSocket);
            }
    
            @Override
            protected void autoReadCleared() {
                clearReadPending();
            }
    
            @Override
            public <T> boolean setOption(ChannelOption<T> option, T value) {
                if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
                    return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
                }
                return super.setOption(option, value);
            }
    
            @Override
            public <T> T getOption(ChannelOption<T> option) {
                if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
                    return NioChannelOption.getOption(jdkChannel(), (NioChannelOption<T>) option);
                }
                return super.getOption(option);
            }
    
            @SuppressWarnings("unchecked")
            @Override
            public Map<ChannelOption<?>, Object> getOptions() {
                if (PlatformDependent.javaVersion() >= 7) {
                    return getOptions(super.getOptions(), NioChannelOption.getOptions(jdkChannel()));
                }
                return super.getOptions();
            }
    
            private ServerSocketChannel jdkChannel() {
                return ((NioServerSocketChannel) channel).javaChannel();
            }
        }
    

    构造方法直接调用了父类DefaultServerSocketChannelConfig的构造方法,那我们就深入吧

        public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
            super(channel);
            if (javaSocket == null) {
                throw new NullPointerException("javaSocket");
            }
            this.javaSocket = javaSocket;
        }
    

    DefaultServerSocketChannelConfig还是调用了父类DefaultChannelConfig的构造方法,并且保存传入的javaSocket参数

    7. DefaultChannelConfig的源码

        protected final Channel channel;
        private volatile RecvByteBufAllocator rcvBufAllocator;
    
        public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
        }
    
        protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
            setRecvByteBufAllocator(allocator, channel.metadata());
            this.channel = channel;
        }
        
        private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
            if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
                ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
            } else if (allocator == null) {
                throw new NullPointerException("allocator");
            }
            setRecvByteBufAllocator(allocator);
        }
        @Override
        public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
            rcvBufAllocator = checkNotNull(allocator, "allocator");
            return this;
        }
    

    在构造方法中创建实例AdaptiveRecvByteBufAllocator,这类其实是比较重要的,从类的名字上看,它的意思是适应接收缓存分配器(翻译可能不太准确),简单来说就是一个可以根据接收数据大小来判断开辟多大的空间存储数据。它是Netty提供的一个可以估计接收数据量,自动扩容和减容的一个类,接下来我们看看它的源码

    package io.netty.channel;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static io.netty.util.internal.ObjectUtil.checkPositive;
    import static java.lang.Math.max;
    import static java.lang.Math.min;
    
    /**
     * The {@link RecvByteBufAllocator} that automatically increases and
     * decreases the predicted buffer size on feed back.
     * It gradually increases the expected number of readable bytes if the previous
     * read fully filled the allocated buffer.  It gradually decreases the expected
     * number of readable bytes if the read operation was not able to fill a certain
     * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
     * returning the same prediction.
     */
    /**
    *  {@link RecvByteBufAllocator}自动增加和
    *  减少反馈时预测的缓冲区大小。 
    *  如果先前的read完全填充了分配的缓冲区,它会逐渐增加预期的可读字节数。如果读取操作不能连续两次填充分配的缓冲区的某个
    *  量,则逐渐减少预期的*可读字节数。否则,它会保持
    *  返回相同的预测。
    */
    public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
    
        static final int DEFAULT_MINIMUM = 64;
        static final int DEFAULT_INITIAL = 1024;
        static final int DEFAULT_MAXIMUM = 65536;
    
        private static final int INDEX_INCREMENT = 4;
        private static final int INDEX_DECREMENT = 1;
    
        private static final int[] SIZE_TABLE;
    
        static {
            List<Integer> sizeTable = new ArrayList<Integer>();
            for (int i = 16; i < 512; i += 16) {
                sizeTable.add(i);
            }
    
            for (int i = 512; i > 0; i <<= 1) {
                sizeTable.add(i);
            }
    
            SIZE_TABLE = new int[sizeTable.size()];
            for (int i = 0; i < SIZE_TABLE.length; i ++) {
                SIZE_TABLE[i] = sizeTable.get(i);
            }
        }
    
        /**
         * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
         */
        @Deprecated
        public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
    
        private static int getSizeTableIndex(final int size) {
            for (int low = 0, high = SIZE_TABLE.length - 1;;) {
                if (high < low) {
                    return low;
                }
                if (high == low) {
                    return high;
                }
    
                int mid = low + high >>> 1;
                int a = SIZE_TABLE[mid];
                int b = SIZE_TABLE[mid + 1];
                if (size > b) {
                    low = mid + 1;
                } else if (size < a) {
                    high = mid - 1;
                } else if (size == a) {
                    return mid;
                } else {
                    return mid + 1;
                }
            }
        }
    
        private final class HandleImpl extends MaxMessageHandle {
            private final int minIndex;
            private final int maxIndex;
            private int index;
            private int nextReceiveBufferSize;
            private boolean decreaseNow;
    
            HandleImpl(int minIndex, int maxIndex, int initial) {
                this.minIndex = minIndex;
                this.maxIndex = maxIndex;
    
                index = getSizeTableIndex(initial);
                nextReceiveBufferSize = SIZE_TABLE[index];
            }
    
            @Override
            public void lastBytesRead(int bytes) {
                // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
                // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
                // the selector to check for more data. Going back to the selector can add significant latency for large
                // data transfers.
                if (bytes == attemptedBytesRead()) {
                    record(bytes);
                }
                super.lastBytesRead(bytes);
            }
    
            @Override
            public int guess() {
                return nextReceiveBufferSize;
            }
    
            private void record(int actualReadBytes) {
                if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
                    if (decreaseNow) {
                        index = max(index - INDEX_DECREMENT, minIndex);
                        nextReceiveBufferSize = SIZE_TABLE[index];
                        decreaseNow = false;
                    } else {
                        decreaseNow = true;
                    }
                } else if (actualReadBytes >= nextReceiveBufferSize) {
                    index = min(index + INDEX_INCREMENT, maxIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                }
            }
    
            @Override
            public void readComplete() {
                record(totalBytesRead());
            }
        }
    
        private final int minIndex;
        private final int maxIndex;
        private final int initial;
    
        /**
         * Creates a new predictor with the default parameters.  With the default
         * parameters, the expected buffer size starts from {@code 1024}, does not
         * go down below {@code 64}, and does not go up above {@code 65536}.
         */
        public AdaptiveRecvByteBufAllocator() {
            this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
        }
    
        /**
         * Creates a new predictor with the specified parameters.
         *
         * @param minimum  the inclusive lower bound of the expected buffer size
         * @param initial  the initial buffer size when no feed back was received
         * @param maximum  the inclusive upper bound of the expected buffer size
         */
        public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
            checkPositive(minimum, "minimum");
            if (initial < minimum) {
                throw new IllegalArgumentException("initial: " + initial);
            }
            if (maximum < initial) {
                throw new IllegalArgumentException("maximum: " + maximum);
            }
    
            int minIndex = getSizeTableIndex(minimum);
            if (SIZE_TABLE[minIndex] < minimum) {
                this.minIndex = minIndex + 1;
            } else {
                this.minIndex = minIndex;
            }
    
            int maxIndex = getSizeTableIndex(maximum);
            if (SIZE_TABLE[maxIndex] > maximum) {
                this.maxIndex = maxIndex - 1;
            } else {
                this.maxIndex = maxIndex;
            }
    
            this.initial = initial;
        }
    
        @SuppressWarnings("deprecation")
        @Override
        public Handle newHandle() {
            return new HandleImpl(minIndex, maxIndex, initial);
        }
    
        @Override
        public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
            super.respectMaybeMoreData(respectMaybeMoreData);
            return this;
        }
    }
    

    好了,具体细节还需要读者自行研读源码了,Netty的源码还是值得一看的,但是也不要陷入源码,把握总体方向Reactor模式。谢谢您的阅读,如有不足,欢迎来访

    相关文章

      网友评论

          本文标题:Netty中的NioServerSocketChannel类实例

          本文链接:https://www.haomeiwen.com/subject/lbyrfctx.html