美文网首页程序员
netty源码分析(15)- 新连接NioEventLoop分配

netty源码分析(15)- 新连接NioEventLoop分配

作者: Jorgezhong | 来源:发表于2019-02-25 14:43 被阅读0次

    上一节总结了channel代码的架构,了解了从鼎城channel接口的定义以及一层一层最后区分开客户端channel和服务端channel。从中也可以体会到抽象和集成的特点。

    本节回顾一下在服务端启动初始化的时候ServerBootstrap#init(),主要做了一些参数的配置。其中对于childGroup,childOptions,childAttrs,childHandler等参数被进行了单独配置。作为参数和ServerBootstrapAcceptor一起,被当作一个特殊的handle,封装到pipeline中。ServerBootstrapAcceptor中的eventLoopworkGroup

        @Override
        void init(Channel channel) throws Exception {
    
            //配置AbstractBootstrap.option
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            //配置AbstractBootstrap.attr
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
            //配置pipeline
            ChannelPipeline p = channel.pipeline();
    
            //获取ServerBootstrapAcceptor配置参数
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
    
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    //配置AbstractBootstrap.handler
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            //配置ServerBootstrapAcceptor,作为Handle紧跟HeadContext
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    可见,整个服务端pipeline的结构如下图所示。bossGroup控制IO事件的检测与处理,整个bossGroup对应的pipeline只包括头(HeadContext)尾(TailContext)以及中部的ServerBootstrap.ServerBootstrapAcceptor

    pipeline结构图

    当新连接接入的时候AbstractNioMessageChannel.NioMessageUnsafe#read()方法被调用,最终调用fireChannelRead(),方法来触发下一个HandlerchannelRead方法。而这个Handler正是ServerBootstrapAcceptor

                        //传递到下一个Handle: ServerBootstrapAcceptor#channelRead()方法
                        //readBuf.get(i) 获取临时存储的一个channel
                        pipeline.fireChannelRead(readBuf.get(i));
    

    接着查看ServerBootstrapAcceptor,它是ServerBootstrap的内部类,同时继承自ChannelInboundHandlerAdapter。也是一个ChannelInboundHandler。其channelRead主要做了以下几件事。

    1. 为客户端channelpipeline添加childHandler
    2. 设置客户端TCP相关属性childOptions和自定义属性childAttrs
    3. workGroup选择NioEventLoop并注册Selector
     private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
            private final EventLoopGroup childGroup;
            private final ChannelHandler childHandler;
            private final Entry<ChannelOption<?>, Object>[] childOptions;
            private final Entry<AttributeKey<?>, Object>[] childAttrs;
            private final Runnable enableAutoReadTask;
    
            ServerBootstrapAcceptor(
                    final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                    Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
                this.childGroup = childGroup;
                this.childHandler = childHandler;
                this.childOptions = childOptions;
                this.childAttrs = childAttrs;
    
                // Task which is scheduled to re-enable auto-read.
                // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
                // not be able to load the class because of the file limit it already reached.
                //
                // See https://github.com/netty/netty/issues/1328
                enableAutoReadTask = new Runnable() {
                    @Override
                    public void run() {
                        channel.config().setAutoRead(true);
                    }
                };
            }
    
            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                //该channel为客户端接入时创建的channel
                final Channel child = (Channel) msg;
    
                //添加childHandler
                child.pipeline().addLast(childHandler);
    
                //设置TCP相关属性:childOptions
                setChannelOptions(child, childOptions, logger);
    
                //设置自定义属性:childAttrs
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    //选择NioEventLoop并注册Selector
                    childGroup.register(child)
                            .addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    
            private static void forceClose(Channel child, Throwable t) {
                child.unsafe().closeForcibly();
                logger.warn("Failed to register an accepted channel: {}", child, t);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                final ChannelConfig config = ctx.channel().config();
                if (config.isAutoRead()) {
                    // stop accept new connections for 1 second to allow the channel to recover
                    // See https://github.com/netty/netty/issues/1328
                    config.setAutoRead(false);
                    ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
                }
                // still let the exceptionCaught event flow through the pipeline to give the user
                // a chance to do something with it
                ctx.fireExceptionCaught(cause);
            }
        }
    
    • 为客户端channelpipeline添加childHandler
      回顾引导的代码,在配置childHandler的时候,使用了ChannelInitializer的一个自定义实例。并且覆盖了其initChannel方法,改方法获取到pipeline并添加具体的Handler
    public class Server {
    
        private static final int PORT = 1111;
    
        public static void main(String[] args) throws Exception {
    
    
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            bossGroup.setIoRatio(100);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            try {
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .attr(AttributeKey.newInstance("attr"), "attr")
                        .childAttr(AttributeKey.newInstance("childAttr"), "childAttr")
                        .handler(new DataServerInitializer())
                        .childHandler(new DataServerInitializer());
            
                ChannelFuture future = serverBootstrap.bind(PORT).sync();
                future.channel().closeFuture().sync();
    
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
    
        }
    
    }
    
    
    class DataServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            
            ch.pipeline()
                    .addLast( new DataServerHandler());
        }
    }
    
    

    查看ChannelInitializer具体的添加逻辑,handlerAdded方法。其实在initChannel逻辑中,首先是回调到用户代码执行initChannel,用户代码执行添加Handler的添加操作,之后将ChannelInitializer自己从pipeline中删除。

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isRegistered()) {
                // This should always be true with our current DefaultChannelPipeline implementation.
                // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
                // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
                // will be added in the expected order.
    
                //初始化Channel
                if (initChannel(ctx)) {
    
                    // We are done with init the Channel, removing the initializer now.
                    removeState(ctx);
                }
            }
        }
    
        private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
            if (initMap.add(ctx)) { // Guard against re-entrance.
                try {
                    //回调到用户代码
                    initChannel((C) ctx.channel());
                } catch (Throwable cause) {
                    // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                    // We do so to prevent multiple calls to initChannel(...).
                    exceptionCaught(ctx, cause);
                } finally {
                    ChannelPipeline pipeline = ctx.pipeline();
                    if (pipeline.context(this) != null) {
                        //删除本身
                        pipeline.remove(this);
                    }
                }
                return true;
            }
            return false;
        }
    
    • workGroup选择NioEventLoop并注册Selector
      这个过程在之前学习的《注册Selector》过程一样,不同的地方是unsafe实例不一样。
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            //服务端unsafe是NioMessageUnsafe,客户端的是NioByteUnsafe
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
    

    相关文章

      网友评论

        本文标题:netty源码分析(15)- 新连接NioEventLoop分配

        本文链接:https://www.haomeiwen.com/subject/ttvwyqtx.html