美文网首页
3. Netty解析:NioSocketChannel、NioS

3. Netty解析:NioSocketChannel、NioS

作者: 饿了就下楼 | 来源:发表于2020-02-12 11:12 被阅读0次

    ChannelFactory

       上文说到,channel方法创建了ReflectiveChannelFactory负责创建NioSocketChannel或者NioServerSocketChannel实例。创建实例的时候就是通过BootstrapChannelFactory的newChannel来创建。

        @Override
        public T newChannel() {
            try {
                return clazz.getConstructor().newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    

      

    NioSocketChannel的基类

      我们首先看看它们NioSocketChannel的基类(同时也是NioServerSocketChannel的基类)AbstractNioChannelAbstractChannel(后者又是前者的基类)中都有哪些东西,可以帮助我们理解netty的channel中有哪些基本元素。

        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            // SelectableChannel 应该就是对应着NIO中的ServerSocketChannel或者SocketChannel实例
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    
    
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    
    

       我们可以看出,不论是NioServerSocketChannel还是NioSocketChannel,每一个channel都有父channel(这个后边会提到),一个unsafe实例、一个pipeline实例(这个很重要),另外还有它所对应的NIO中的SelectableChannel,猜测一下大概也就是NioServerSocketChannel对应着ServerSocketChannel,NioSocketChannel对应着SocketChannel,并且还配置了读事件,并设置NIO的channel为非阻塞形式,毕竟这里使用的都是Nio开头的channel嘛。
      

    NioSocketChannel及创建

       NioSocketChannel的创建源自于client端的connect操作。关于connect操作的具体过程,我们先不展开,只是介绍一下NioSocketChannel是如何创建的。connect方法会调用到doResolveAndConnect方法,其中的initAndRegister()(位于AbstractBootstrap)就完成了NioSocketChannel的创建。

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory().newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    

       从整体上来看,这个方法完成了通过ChannelFactory创建NioSocketChannel实例,完成初始化并且注册到事件循环组中。
      ChannelFactory(实际就是之前提到的ReflectiveChannelFactory)会通过反射方式利用无参构造器创建NioSocketChannel,我们跟踪它的无参构造器,会发现最终会调用下面的构造方法,传入的parent是null(顺便提前提一下,在client端,每个NioSocketChannel是没有父channel的,但是在Server端就不一样了,我会在server端的时候再讲),然后传入了一个Java NIO中的SocketChannel实例,这样就已经建立起了netty中的NioSocketChannel和NIO中SocketChannel的联系,本质上来说netty中的Channel不等于NIO中的Channel,而他们之间的联系和区别大概就如同这里所说的这样,NioSocketChannel(准确说是它的基类AbstractNioChannel)中封装了一个SocketChannel。

        public NioSocketChannel(Channel parent, SocketChannel socket) {
            super(parent, socket);
            config = new NioSocketChannelConfig(this, socket.socket());
        }
    

       同时还要注意的一点,也在上文提到过,每个NioSocketChannel实例都会封装一个pipeline实例(在基类AbstractChannel中),看源码可以知道,这个实例就是DefaultChannelPipeline
      

    NioSocketChannel的初始化及注册

       上文所讲为initAndRegister()方法先通过ChannelFactory创建了NioSocketChannel实例,紧接着会调用init(channel)方法对创建的channel进行初始化。这里初始化的方法被子类BootStrap具体实现。

        void init(Channel channel) throws Exception {
            ChannelPipeline p = channel.pipeline();
            p.addLast(handler());
    
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
            }
        }
    
    

       在init方法中,向NioSocketChannel中封装的pipeline中添加(addLast)了一个Handler,而这个handler就是在执行BootStrap.handler(xxx)时传入的Handler,具体来说就是ChannelInitializer,随后又配置了一些options和attributes。

         b.group(group)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.TCP_NODELAY, true)
         // 配置需要的处理器,也是交给了Bootstrap管理
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 xxxxxxxxxxxxxxxxxxx
             }
         });  
    

       initAndRegister方法执行init(channel)方法之后,开始完成NioSocketChannel向事件循环组的注册

    ChannelFuture regFuture = group().register(channel);

      这里的注册是通过调用事件循环组的register方法来完成,回想一下,我们在client端的启动代码中写到,我们用的事件循环组是NioEventLoopGroup,我们找到它的register(channel)方法(实际位于MultithreadEventLoopGroup),最终他调到的是

    next().register(channel);

    而next方法,最终实际调用的就是chooser(GenericEventExecutorChooserPowerOfTwoEventExecutorChooser)的next方法,从NioEventLoopGroup的children数组字段中选择出一个NioEventLoop。而register就是调用的选择出的NioEventLoop的register方法(这个方法实际位于它的基类SingleThreadEventLoop)。

        @Override
        public ChannelFuture register(Channel channel) {
            return register(channel, new DefaultChannelPromise(channel, this));
        }
    
        public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
            /*省略部分代码*/
            channel.unsafe().register(this, promise);
            return promise;
        }
    
    

      注册的时候先将当前NioSocketChannel和当前EventLoop封装在一个DefaultChannelPromise中,然后调用通过NioSocketChannel中的unsafe实例完成注册(实现在AbstractUnsafe中)。

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           /*省略一些代码*/
      
            AbstractChannel.this.eventLoop = eventLoop;
    
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
    
        private void register0(ChannelPromise promise) {
            try {
                boolean firstRegistration = neverRegistered;
                // 重点方法
                doRegister();
                neverRegistered = false;
                registered = true;
    
                /*后边的代码我们暂时先不看*/            
                
                }
            } catch (Throwable t) {
                /*省略的代码*/
            }
        }
    
    
        // 方法位于AbstractNioChannel
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    // javaChannel 返回的就是 当前netty中的channel所关联的java NIO中的channel
                    // 具体来说就是 netty的NioSocketChannel中关联的NIO中的SocketChannel
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
    
    

      我们分析一下上面的代码:在执行注册时,实际就是完成了NioSocketChannel关联的NIO中的SocketChannel向NioEventLoop关联的NIO中的Selector的注册(但是这个时候并没有注册有效的事件,因为传入的参数为0)。那么注册工作是交由对应的NioEventLoop中的线程来完成,如果不是的话(因为此时线程有可能就是从启动代码一路直行过来的主线程),就封装成一个Runnable任务塞入NioEventLoop的execute方法,通过前面的文章知道,这个任务会被添加到任务队列中,由NioEventLoop中的线程来完成,而这个过程是个异步过程。

    NioSocketChannel创建及注册过程总结

      总结一下NioSocketChannel的创建初始化及注册的过程:创建的NioSocketChannel,都会包含一个pipeline实例,一个unsafe实例,一个NIO中的SocketChannel实例,以及这个SocketChannel所对应的SelectionKey(在channel注册阶段赋值)。创建好NioSocketChannel后,进行初始化(init方法),将在启动代码中handler(xxxx)方法中的ChannelInitializer这个Handler添加进入关联的pipeline中(addLast方法添加)。随后再开始NioSocketChannel的注册,注册的流程简图如下。


    NioSocketChannel的注册

    NioServerSocketChannel

      上文理解了之后,在看NioServerSocketChannel,就比较容易了,与NioSocketChannel不同的是,NioServerSocketChannel中传入的是Accept事件,而NioSocketChannel中为读事件。

        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    

    this.readInterestOp = readInterestOp;

    作为类比,NioServerSocketChannel的注册源自于bind方法的执行,与NioSocketChannel不同点在于initAndRegister()中的init(channel)方法,服务端的NioServerSocketChannel的init方法被ServerBootStrap重写。

        @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    //handler()方法获取到父类AbstractBootStrap中管理的Handler,也就是父循环组对应的handler
                    ChannelHandler handler = handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

       server端的init方法相比于client端稍微复杂一些,它向NioServerSocketChannel的pipeline中添加了一个ChannelInitializer处理器,负责后续添加父Handler(在启动代码中对应的就是LoggingHandler)。并且向通道所在的循环组添加任务,用于将关联了子处理器的ServerBootstrapAcceptor添加到pipeline中。

    b.handler(new LoggingHandler(LogLevel.INFO))

    其余的部分与NioSocketChannel相似,就不在多说了,不过稍微提一下的一个点是,NioServerSocketChannel的注册既然跟NioSocketChannel相似,那么在进行注册的时候都是向AbstractBootStrap的group所代表的事件循环组注册的,也就是在server端,NioServerSocketChannel需要向父循环组中某个NioEventLoop关联的Selector进行注册。
      

    *链接

    1. Netty解析:第一个demo——Echo Server
    2. Netty解析:NioEventLoopGroup事件循环组
    3. Netty解析:NioSocketChannel、NioServerSocketChannel的创建及注册
    4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现
    5. Netty解析:connect/bind方法背后
    6. Netty解析:服务端如何接受连接并后续处理读写事件

    相关文章

      网友评论

          本文标题:3. Netty解析:NioSocketChannel、NioS

          本文链接:https://www.haomeiwen.com/subject/yxntnctx.html