美文网首页
四、netty源码分析之ServerBootstrap

四、netty源码分析之ServerBootstrap

作者: 丑星星 | 来源:发表于2019-09-28 09:30 被阅读0次

    一、功能概述

    前两篇我们分别介绍了EventLoopGroupEventLoop在netty中的作用。但是仅仅知道这些,可能对netty如何完成一整个网络事件监控到任务分发处理还是有些模糊。本篇我们要分析一下netty的启动流程。在我们使用netty编程的时候,我们的使用ServerBootstrapBootstrap来实现服务端和客户端的启动。我们先来看一下这两个类的相关类图:

    AbstractBootstrap
    netty定义了抽象类AbstractBootstrap,然后在此基础上实现了ServerBootstrapBootstrap分别作为服务端和客户端的启动类。本篇,我们以ServerBootstrap为例,分析一下服务端的启动流程。

    我们根据下面的测试代码分析一下具体的流程,这也是我们创建netty服务端的基本流程:

            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class) // (3)
                 .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline().addLast(new DiscardServerHandler());
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
        
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync(); // (7)
        
                // Wait until the server socket is closed.
                // In this example, this does not happen, but you can do that to gracefully
                // shut down your server.
                f.channel().closeFuture().sync();
            } finally {.
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
    

    二、ServerBootstrap启动的过程

    我们看一下上面测试代码的内容,先是创建了两个EventLoopGroup的对象,bossGroupworkerGroup。然后又创建了一个ServerBootstrap对象,将这两个EventLoopGroup注册到ServerBootstrap中,然后设置一系列的参数,这些方法其实都是简单地设置ServerBootstrap的一些属性。设置完这些属性调用bind()方法实现端口的绑定,也就是整个netty服务端启动的核心过程。我们来分析一下bind()方法的的过程:

        public ChannelFuture bind() {
            validate();
            SocketAddress localAddress = this.localAddress;
            if (localAddress == null) {
                throw new IllegalStateException("localAddress not set");
            }
            return doBind(localAddress);
        }
    
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
        private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    bind()方法在父类AbstractBootstrap中,这里我们有理由怀疑,这部分有用到了模板方法模式。首先调用了validate()方法,来验证参数的完整性,我们来看一下都验证了那些内容:

        /**
         * Validate all the parameters. Sub-classes may override this, but should
         * call the super method in that case.
         */
        public B validate() {
            if (group == null) {
                throw new IllegalStateException("group not set");
            }
            if (channelFactory == null) {
                throw new IllegalStateException("channel or channelFactory not set");
            }
            return self();
        }
    

    这里验证了groupchannelFactory这两个属性。在我们调用b.group(bossGroup, workerGroup)的时候,bossGroup就是validate()方法中验证的对象。AbstractBootstrap这个抽象类只持有了这一个EventLoopGroup对象,而workerGroup是我们ServerBootstrap扩展出来的。我们可以看到AbstractBootstrap.validate的方法注释中写道,子类可以重写这个方法,但是必须要调用super method。所以我们看一下ServerBootstrap有么有重写这个方法:

        @Override
        public ServerBootstrap validate() {
            super.validate();
            if (childHandler == null) {
                throw new IllegalStateException("childHandler not set");
            }
            if (childGroup == null) {
                logger.warn("childGroup is not set. Using parentGroup instead.");
                childGroup = config.group();
            }
            return this;
        }
    

    果不其然,ServerBootstrap也重写了这个方法,这里验证了childGroup,也就是workerGroup。也验证了childHandler。这个childHandler我们后面在分析。
    做完验证之后,调用了doBind(final SocketAddress localAddress)方法。我们看一下这个方法的内容。这个方法中,先是调用initAndRegister()创建并且注册Channel,并返回ChannelFuture。这个方法我们后面在看,我们先把doBind方法的主要流程理清楚。接下来,判断创建的Channel是否注册完成,如果完成就直接调用doBind0(regFuture, channel, localAddress, promise),否则就添加一个监听器,等到Channel是否注册完成后再调用doBind0(regFuture, channel, localAddress, promise)
    doBind0(regFuture, channel, localAddress, promise)做了什么呢?其实也很简单,获取channel绑定的eventLoop事件处理器,然后提交一个任务,任务的内容就是调用channel的bind(SocketAddress localAddress, ChannelPromise promise)方法。其实这不是真正的将我们的服务绑定到指定端口的方法,这里的bind只是执行了一个生名周期中的回调方法而已,调用所有注册到ChannelPipeline当中的ChannelOutboundHandler对象的bind方法。
    那么真正的将我们的服务绑定到指定端口的操作在哪里呢?我们目前还有initAndRegister()没有分析,所以,真正的绑定端口的操作,一定在这个方法里!我们看一下:

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    

    首先,通过我们设置的channelFactory来创建一个Channel。然后调用init方法来初始化Channel。我们可以看一下init方法。

       abstract void init(Channel channel) throws Exception
    

    是一个抽象方法,交给子类去实现,这印证了我们刚开始猜测的·AbstractBootstrap.bind·是一个模板方法。我们先不看ServerBootstrap是怎么初始化channel的。我们还是先看一下initAndRegister()整体的流程。在初始化完channel后,通过调用config().group().register(channel)将channel注册到EventLoopGroup中,这个EventLoopGroup到底是哪个EventLoopGroup呢?其实是我们传入的bossGroup。这个过程做了什么操作呢?我们以传入的NioServerSocketChannel为例:结合我们前两偏分析的NioEventLoopGroupNioEventLoop,我们可以知道,这个操作就是把NioServerSocketChannel持有的java的SelectableChannel注册到NioEventLoop持有的java的Selector中去,让Selector来监听IO事件。注册完之后initAndRegister()的处理过程就结束了。我们回过头去看ServerBootstrap是怎么实现abstract void init(Channel channel) throws Exception的:

        @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    首先一通操作,将ServerBootstrap设置的属性都应用在channel上。这个不是我们分析的重点。接下来的一段代码看似很长,但是就只做了一件事,就是将channel的ChannelPipeline中添加一个事件处理器:ServerBootstrapAcceptorServerBootstrapAcceptor本身继承了ChannelInboundHandlerAdapter。那么ServerBootstrapAcceptor做了什么呢?我们看一下它的channelRead方法:

            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
    
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                try {
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    

    我们可以看到就是将传入的对象转成Channel对象,然后将这个channel注册到childGroup中。到此整个bind的过程就结束了,这是什么操作?怎么这么迷呢?
    到这里,把前两篇分析的NioEventLoopNioEventLoopGroup再串起来一起看。
    首先,在启动的过程中,ServerBootstrap创建了NioServerSocketChannel,并且把NioServerSocketChannel持有的SelectableChannel注册到bossGroup中的NioEventLoop持有的Selector中。NioEventLoop一直在做select操作,监听SelectableChannel的IO事件。当监听到处理的事件之后,会根据SelectionKey被attach的对象来调用不同的processSelectedKey的重载方法处理。(NioEventLoop的逻辑)。那我们在回过头来看一看,在启动的时候,Channel注册到EventLoop返回的SelectionKey被attach的是什么:

        //AbstractNioChannel的doRegister方法
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    

    我们可以看到attach的对象就是自身,也就是AbstractNioChannel类型的对象。所以接下来会调用processSelectedKey(SelectionKey k, AbstractNioChannel ch)进行处理,而我们的NioServerSocketChannel感兴趣的事件只有ACCEPT事件,所以方法最终会走到下面这个逻辑:

                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
    

    这里的unsafeNioMessageUnsafe的对象。我们看看read方法究竟干了什么:

            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    

    read方法首先是调用doReadMessages来把数据读到readBuf中。而doReadMessages就是调用ServerSocketChannelaccept方法获取和客户端通信的SocketChannel对象。然后将这个对象交给pipeline去处理,调用pipeline.fireChannelRead()方法。这意味着什么呢?
    这意味着这个事件会流转到我们上面提到的ServerBootstrapAcceptor处理。也就是会把和客户端通信的SocketChannel注册到我们注册的workerGroup当中。由workerGroup去监听SocketChannel后续的事件并且处理相关的事件!到这里,大家是不是对ServerBootstrap启动时做的事情比较清晰了?

    三、复盘

    我们本篇讲了ServerBootstrap在启动过程中做的事情。ServerBootstrap接受了两个EventLoopGroup的参数,我们这里分别叫它bossGroupworkerGroup。两个EventLoopGroup通过ServerBootstrapAcceptor为桥梁建立起了联系。bossGroup负责监听ACCEPT事件,监听到ACCEPT事件之后,将用于和客户端通信的SocketChannel注册到workerGroup,由workerGroup监听后续的读事件并且做业务逻辑处理。这里我们可以看到,其实bossGroup做的事情是非常少的,业务逻辑最后都会交给workerGroup去处理。所以在我们创建bossGroup,我们不需要指定太多的线程。反而workerGroup要创建跟多的线程去处理业务逻辑。

    讲到这里,我们再来理一下netty的线程模型:
    首先,对于EventLoop我们可以看一下,我们可以用于生产的EventLoop的实现类都是继承了SingleThreadEventLoop的。所以一个EventLoop的对象只会绑定一个线程。一个EventLoopGroup可以管理多个EventLoop。而一个Channel只会被绑定到一个EventLoop上。

    EventLoop

    接下来,我想提一下Reactor模型。什么是Reactor模型?简单来说,Reactor模型就是IO多路复用+线程池。网络模型使用IO多路复用,处理任务使用线程池。而Reactor模型又分为单Reactor单线程、单Reactor多线程、多Reactor多线程。我们使用netty的时候,可以通过控制bossGroup和workerGroup的数量来灵活的实现上述三种Reactor模式。而且我们可以通过bossGroup和workerGroup使用同一个对象来实现IO和事件处理使用同一个EventLoopGroup。ServerBootstrap也提供了只有一个参数的group方法实现了这个功能。

    除此之外,假如我们调用ServerBootstrap的group方法时,传的参数是EventLoop对象可不可以呢?完全是可以的,因为EventLoop继承了EventLoopGroup。这两个接口有相同的功能窗口。这也是为什么EventLoop要继承EventLoopGroup的另一个原因。使EventLoop可以脱离EventLoopGroup单独使用!

    相关文章

      网友评论

          本文标题:四、netty源码分析之ServerBootstrap

          本文链接:https://www.haomeiwen.com/subject/zrckyctx.html