美文网首页
NETTY入口--ServerBootstrap

NETTY入口--ServerBootstrap

作者: 小犇手K线研究员 | 来源:发表于2017-02-21 22:02 被阅读341次

    netty作为服务端从ServerBootstrap启动, 本文默认传输层协议为TCP协议。

    UML图

    bootstrap.png

    如上图所示,Bootstrap和ServerBoostrap都继承自AbstractBootstrap.

    ServerBootstrap

    使用ServerBootstrap启动程序如下:

            // accept线程池,负责接受新进来的连接,然后将连接注册到工作线程池中去
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 工作线程池,负责channel业务数据的读、写
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 // serverSocketChannel上的handler
                 .handler(new LoggingHandler(LogLevel.INFO))
                 // socketChannel上的handler
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) {
                         ChannelPipeline p = ch.pipeline();
                         if (sslCtx != null) {
                             p.addLast(sslCtx.newHandler(ch.alloc()));
                         }
                         p.addLast(new DiscardServerHandler());
                     }
                 });
                // 绑定监听端口
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                // In this example, this does not happen, but you can do that to gracefully
                // shut down your server.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
    

    从上面可以看出,ServerBootstrap的核心操作是bind()方法,该方法新建一个serverSocketChannel,开始监听本地端口。ServerBootstrap接口与Bootstrap属性配置的最大区别是ServerBootstrap还需要设置socketChannel的线程池和handlers。
    bind()方法代码如下:

    @Override
        public ChannelFuture bind(SocketAddress localAddress) {
            return super.bind(localAddress);
        }
    

    AbstractBootstrap bind()方法如下:

    public ChannelFuture bind(SocketAddress localAddress) {
            // 监测该有的属性是不是都具备并且合法 
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);
        }
    

    doBind()方法如下:

    private ChannelFuture doBind(final SocketAddress localAddress) {
            // 创建、初始化、注册channel
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            // 注册失败
            if (regFuture.cause() != null) {
                return regFuture;
            }
      // 如果注册完成
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
       // 如果注册没有完成
                // Registration future is almost always fulfilled already, but just in case it's not.
       // 由于没有注册完成,所以channel未必有excutor,不能利用
               // channel产生promise
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
                            // 绑定
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    

    initAndRegister()方法如下:

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                // 创建channel
                channel = channelFactory.newChannel();
               // 初始化,为serverSocketChannel增加handler,可选项,参数
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
      // 注册到线程中
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    

    init()方法与Bootstrap中的方法略有不同,代码如下:

    @Override
        void init(Channel channel) throws Exception {
            // 设置选项
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                channel.config().setOptions(options);
            }
      // 设置属性
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
      // 增加初始化handler
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 增加serverSocketChannel的handler
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
                   // 增加最后一个handler,负责将channel注册到work线程
                   // 这是与Bootstrap init()方法的重要区别
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    

    doBind0()方法如下:

    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        // 调用channel的操作进行绑定
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    总结: ServerBootstrap绑定一个serverSocketChannel的流程如下:

    • 创建一个ServerSocketChannel
    • 初始化一个ServerSocketChannel
    • 注册ServerSocketChannel
    • 调用channel的绑定操作,实现监听一个端口

    相关文章

      网友评论

          本文标题:NETTY入口--ServerBootstrap

          本文链接:https://www.haomeiwen.com/subject/xcmuwttx.html