3_netty_Bootstrap

作者: loading_17 | 来源:发表于2018-06-18 23:04 被阅读0次

    接着上篇例子看

        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .option(ChannelOption.SO_BACKLOG, 100)
                  .handler(new LoggingHandler(LogLevel.INFO))
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                          @Override
                          public void initChannel(SocketChannel ch) throws Exception {
                              ChannelPipeline p = ch.pipeline();
                              if (sslCtx != null) {
                                  p.addLast(sslCtx.newHandler(ch.alloc()));
                              }
                              //p.addLast(new LoggingHandler(LogLevel.INFO));
                              p.addLast(serverHandler);
                          }
                      });
    
              ChannelFuture f = b.bind(PORT).sync();
              f.channel().closeFuture().sync();
    

    先看构造方法。

        public ServerBootstrap() { }
    
        private ServerBootstrap(ServerBootstrap bootstrap) {
            super(bootstrap);
            childGroup = bootstrap.childGroup;
            childHandler = bootstrap.childHandler;
            synchronized (bootstrap.childOptions) {
                childOptions.putAll(bootstrap.childOptions);
            }
            synchronized (bootstrap.childAttrs) {
                childAttrs.putAll(bootstrap.childAttrs);
            }
        }
    

    提供了两个构造方法,如例子中,我们用的是空构造。接着调用group方法

     //这里将boss和worker一起传入
        public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            }
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = childGroup;
            return this;
        }
    

    调用父类的构造方法,传入bossGroup

        //EventLoopGroup为将要创建的Channel处理所有的events
        public B group(EventLoopGroup group) {
            if (group == null) {
                throw new NullPointerException("group");
            }
            if (this.group != null) {
                throw new IllegalStateException("group set already");
            }
            this.group = group;
            return (B) this;
        }
    

    接着设置channel

        //这里创建了对应的NioServerSocketChannel
        //到后面对应的Channel再分析
        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    

    接着是option方法,这里传入的参数是ChannelOption.SO_BACKLOG,值为100。这是个socket的标准参数,表示当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。这里还有很多参数可以设置,就不一一列举了。

        public <T> B option(ChannelOption<T> option, T value) {
            if (option == null) {
                throw new NullPointerException("option");
            }
            if (value == null) {
                synchronized (options) {
                    options.remove(option);
                }
            } else {
                synchronized (options) {
                    options.put(option, value);
                }
            }
            return (B) this;
        }
    

    下面显示设置handler,这里添加了一个日志处理handler,用于记录所有的event。再接着调用childHandler方法,这里使用new ChannelInitializer<SocketChannel>(),并实现initChannel方法,这里就是初始化channel,里面指定了业务处理器。
    再下来就是重点,服务器启动

    // Start the server.
    ChannelFuture f = b.bind(PORT).sync();
    

    AbstractBootstrap,bind方法用于创建新的Channel并且绑定

        public ChannelFuture bind(int inetPort) {
            return bind(new InetSocketAddress(inetPort));
        }
        public ChannelFuture bind(SocketAddress localAddress) {
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    public ChannelFuture bind(SocketAddress localAddress) {
            //验证参数
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
            //初始化和注册Channel
            final ChannelFuture regFuture = initAndRegister();
            //获取当前Channel
            final Channel channel = regFuture.channel();
            //判断注册是否失败了
            if (regFuture.cause() != null) {
                return regFuture;
            }
            //等待注册工作完成
            if (regFuture.isDone()) {
                // 经过前面的两个判断可知注册一定成功了
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    

    看看初始化和注册

    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                //ReflectiveChannelFactory的newChannel方法,直接使用
                //clazz.newInstance()创建了NioServerSocketChannel
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    
        void init(Channel channel) throws Exception {
            //之前传入的SO_BACKLG就在options里面
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
            //设置属性
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
            //pipeline在channel实例化的时候,构造函数里面默认生成
            //DefaultChannelPipeline
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
            //为NioServerSocketChannel的pipeline添加handler
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
              //新增一个任务,为pipeline增加ServerBootstrapAcceptor
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    处理完init后,会调用register方法

    //NioEventLoopGroup的register方法,注册channel
    config().group().register(channel);
    

    initAndRegister方法内:

    1. 通过channelFactory创建了channel(NioServerSocketChannel)
    2. 初始化channel,为channel增加option和attr,并且在其pipeline上添加了handler。
    3. 初始化完后进行注册。

    再回到doBind方法,初始化和注册channel后返回的regFuture。这个regFuture表示注册结果,regFuture.isDone()会等待注册是否完成,注册成功完成后,会调用doBind0方法。

       private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // 这个方法在channelRegistered()被触发之前调用
            // 有了一次改变channelRegistered()实现中的pipeline的机会
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    那么总结下服务端绑定流程

    1. 初始化和注册channel,包括channel的创建
    2. 初始化后返回ChannelFuture,表示注册的结果
    3. 注册成功后,调用bind0,通过eventloop执行线程,线程使用channel.bind(localAddress, promise),最后返回一个ChannelPromise表示绑定的结果。

    下面看下客户端的启动。

    Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .remoteAddress(new InetSocketAddress(host, port))
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(
                                        new EchoClientHandler()
                                );
                            }
                        });
                ChannelFuture f = b.connect().sync();
                f.channel().closeFuture().sync();
    

    Bootstrap的connect方法

        public ChannelFuture connect() {
            validate();
            SocketAddress remoteAddress = this.remoteAddress;
            if (remoteAddress == null) {
                throw new IllegalStateException("remoteAddress not set");
            }
    
            return doResolveAndConnect(remoteAddress, config.localAddress());
        }
    
     private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            //返回的是NioSocketChannel
            final Channel channel = regFuture.channel();
    
            if (regFuture.isDone()) {
                if (!regFuture.isSuccess()) {
                    return regFuture;
                }
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        // Direclty obtain the cause and do a null check so we only need one volatile read in case of a
                        // failure.
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
                            doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
        private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                                   final SocketAddress localAddress, final ChannelPromise promise) {
            try {
                final EventLoop eventLoop = channel.eventLoop();
                final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    
                if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                    // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                    doConnect(remoteAddress, localAddress, promise);
                    return promise;
                }
    
                final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    
                if (resolveFuture.isDone()) {
                    final Throwable resolveFailureCause = resolveFuture.cause();
    
                    if (resolveFailureCause != null) {
                        // Failed to resolve immediately
                        channel.close();
                        promise.setFailure(resolveFailureCause);
                    } else {
                        // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                        doConnect(resolveFuture.getNow(), localAddress, promise);
                    }
                    return promise;
                }
    
                // Wait until the name resolution is finished.
                resolveFuture.addListener(new FutureListener<SocketAddress>() {
                    @Override
                    public void operationComplete(Future<SocketAddress> future) throws Exception {
                        if (future.cause() != null) {
                            channel.close();
                            promise.setFailure(future.cause());
                        } else {
                            doConnect(future.getNow(), localAddress, promise);
                        }
                    }
                });
            } catch (Throwable cause) {
                promise.tryFailure(cause);
            }
            return promise;
        }
    
        private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            final Channel channel = connectPromise.channel();
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    
    1. 初始化和注册Channel
    2. 解析地址
    3. 通过channel的eventLoop来执行channel.connect(remoteAddress, connectPromise)

    相关文章

      网友评论

        本文标题:3_netty_Bootstrap

        本文链接:https://www.haomeiwen.com/subject/flqweftx.html