美文网首页
netty Server端启动源码分析

netty Server端启动源码分析

作者: whateverblake | 来源:发表于2020-07-10 10:36 被阅读0次

    开篇

    我们使用netty源码包netty-example中的EchoServer来分析使用netty作为网络通信框架服务端的启动过程

    说明

    所有的分析基于NIO

    一段Server端启动样板代码

            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         p.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$".getBytes())));
                         p.addLast(serverHandler);
                     }
                 });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    

    一般使用netty实现网络通信的服务端都会实现类似上面的样本代码,下面我们对样本代码中的一些关键部分进行简单解析

    • bossGroup
      我们知道在经典的NIO编程中,Server端有一个专门的Acceptor线程负责接收用户的连接请求,在netty中bossGroup就是为Acceptor提供线程的线程池(站在线程角度去理解),一般bossGroup中只是包含一个线程

    • workerGroup
      每个被Server接收的连接也就是创建的SocketChannel会绑定到一个线程,之后SocketChannel上发生的所有读写事件都是由其绑定的线程处理,在netty中workerGroup就是给每个SocketChannel分配线程的线程池。关于netty的线程模型我在另一篇文章中有详细的解析https://www.jianshu.com/p/732f9dea34d7

    • ServerBootstrap
      Server端的引导类,下面讲解ServerBootstrap的一些方法

    1. group方法设置acceptor和worker的线程池
    2. channel方法设置Server端的channel类型,NioServerSocketChannel是server端的channel类型,它包装了java nio中的ServerSocketChannel
    3. option()用来设置底层ServerSocketChannel一些tcp参数
    4. childOption()用来设置将来建立的SocketChannel的一些tcp参数
    5. handler()设置Server端事件处理链上的一个处理节点
    6. childHandler()用来设置被server端接受建立的SocketChannel事件处理链的一个处理节点

    Server端启动过程

    Server端启动的入口是serverBootstrap.bind(port),bind过程分成两个部分:

    • initAndRegister
      主要执行ServerSocketChannel的建立和初始化,下面详细分析

    • doBind
      把Server端绑定到指定的ip和端口上

    initAndRegister
    //返回值是future类型,说明初始化是initAndRegister是异步的过程
    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
             //channelFactory是ServerBootstrap根据用户设置的NioServerSocketChannel.class生成的NioServerSocketChannel工厂
             //通过channelFactory.newChannel()就可以创建出服务端的NioServerSocketChannel,下面我们会详细分析NioServerSocketChannel创建过程
                channel = channelFactory.newChannel();
               //对channel进行初始化
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
           //上面完成了channel的初始化,这里实现对channel的注册
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    
    

    对于Server端来说上面initAndRegister方法主要包含了三个核心过程

    1. 创建NioServerSocketChannel
    2. 初始化NioServerSocketChannel
    3. 注册NioServerSocketChannel

    我们分别来分析

    • 创建NioServerSocketChannel

    channelFactory通过反射调用NioServerSocketChannel类的无参数构造方法创建NioServerSocketChannel对象

      public NioServerSocketChannel() {
            //DEFAULT_SELECTOR_PROVIDER 是NioServerSocketChannel静态常量,类型是SelectorProvider,
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
    

    newSocket方法创建了java底层的ServerSocketChannel对象,这里也就看出了NioServerSocketChannel就是netty对ServerSocketChannel的包装

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                /**
                 *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
                 *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
                 *
                 *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
                 */
                //使用SelectorProvider创建java底层的ServerSocketChannel
                //写过java nio的同学,都应该记得之前创建ServerSocketChannel都是通过ServerSocketChannel.open()实现的
               //其实ServerSocketChannel.open()源码就是provider.openServerSocketChannel()
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    

    接来下NioServerSocketChannel会给自己和父类中的关键属性初始化值

    public NioServerSocketChannel(ServerSocketChannel channel) {
           //初始化父类
            super(null, channel, SelectionKey.OP_ACCEPT);
           //这个NioServerSocketChannelConfig是NioServerSocketChannel绑定的配置类,比如设置了最大可以连续从channel读多少次数据
          //再比如设置了每次从channel读取数据的时候,每次读取数据使用的byteBuf大小动态变化策略
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    

    我们看下各层父类的初始化

    AbstractNioChannel

    • 设置ch 为ServerSocketChannel
    • 设置ServerSocketChannel感兴趣的事件为OP_ACCEPT,readInterestOp=16
    • 设置ServerSocketChannel为非阻塞
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    

    AbstractChannel

    • 创建unsafe为NioMessageUnsafe,unsafe是处理channel连接,绑定和channel上读写事件的核心类
    • 创建channel的事件处理链DefaultChannelPipeline
     protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    到此NioServerSocketChannel的创建已经基本完成了,关于netty pipeline的知识点可以查看我写的另一边文章 https://www.jianshu.com/p/36803adcbc02


    初始化NioServerSocketChannel --- init

    直接上ServerBootstrap的init源代码吧

    void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
            }
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
           
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    init方法核心的功能就是通过ChannelInitializer向NioServerSocketChannel绑定的pipeline添加了两个handler

    1. 用户配置的server handler

    2. netty内置的ServerBootstrapAcceptor,这个handler是一个InboundHandler,它的主要功能是给连接请求创建的NioSocketChannel设置用户在ServerBootstrap中指定的参数,添加用户设置的childHandler,使用childGroup对NioSocketChannel进行注册。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                //对于server端来说 这个msg 其实是一个NioSocketChannel
    
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    
                setChannelOptions(child, childOptions, logger);
                setAttributes(child, childAttrs);
                try {
                    //使用NioEventLoopGroup对NioSocketChannel进行注册
    
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }
    

    上面第二个handler添加到pipe中的方式和第一个handler添加的方式不同,ServerBootstrapAcceptor添加的方式是向NioServerSocketChannel绑定的NioEventLoop提交了runnable任务,这个任务实现的功能就是把ServerBootstrapAcceptor添加到pipeline中。因为在init的时候channel还没有做register,所以这个地方触发的handlerAdded事件被存放起来,等将来channel register成功之后才会继续触发相应的handlerAdded方法

    NioServerSocketChannel 注册

    注册主要的功能就是从bossGroup管理的NioEventLoop(正常情况bossGroup中只管理一个NioEventLoop)中取出一个NioEventLoop然后绑定到NioServerSocketChannel上,最终是通过unsafe.register方法实现,我们解析下unsafe.register的源代码

     public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ObjectUtil.checkNotNull(eventLoop, "eventLoop");
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
              //NioServerSocketChannel绑定从bossGroup中分配的NioEventLoop
                AbstractChannel.this.eventLoop = eventLoop;
               //判断当前运行线程和eventLoop绑定的线程是不是相同
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                      //向eventLoop提交一个执行register0()的任务
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    

    我看到执行register方法的线程会向NioServerSocketChannel绑定的NioEventLoop提交一个任务,这个时候eventLoop就会被以线程的方式启动起来,具体启动的过程我在https://www.jianshu.com/p/732f9dea34d7有解析

    • register0
      方法register0会在NioServerSocketChannel绑定的线程中执行
      我解析下它的源代码
     private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                   //这个方法是实现了java底层ServerSocketChannel向selector注册的功能
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                     //执行之前被添加到pending handlerAdded事件链中的handlerAdded事件,
                    //这会时候对于NioServerSocketChannel来说ServerBootstrapAcceptor会被添加到pipeline中
                    pipeline.invokeHandlerAddedIfNeeded();
                    //设置初始化和注册成功
                    safeSetSuccess(promise);
                     //触发pipeline上类型为InboundHandler handler的channelRegistered方法
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    //判断ServerSocketChannel是不是已经准备好接受连接请求了
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    //doRegister主要是完成了java底层的ServerSocketChannel向selector绑定
    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
       //这里需要注意的是ServerSocketChannel在selector绑定的感兴趣事件是0,这是为什么呢,
    //因为这个时候ServerSocketChannel还没有绑定到具体的ip和端口上,在下面的分析中我们会看到在channelActive事件中,这个interestOps会被修改成16
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    

    到此NioServerSocketChannel注册和初始化已经完成了


    接下来我们看下NioServerSocketChannel如何绑定到服务器的

    doBind0

    doBind0实现的是向NioServerSocketChannel绑定的NioEventLoop提交了channel绑定到指定ip和端口的任务

     private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    我们看下这个方法的调用链
    channel.bind --> pipeline.bind --> tail.bind
    tail.bind的源代码如下:

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
           ObjectUtil.checkNotNull(localAddress, "localAddress");
           if (isNotValidPromise(promise, false)) {
               // cancelled
               return promise;
           }
         //找到符合执行要求的OutboundHandler
           final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
           EventExecutor executor = next.executor();
           if (executor.inEventLoop()) {
               next.invokeBind(localAddress, promise);
           } else {
               safeExecute(executor, new Runnable() {
                   @Override
                   public void run() {
                       next.invokeBind(localAddress, promise);
                   }
               }, promise, null, false);
           }
           return promise;
       }
    

    可以看到服务端的bind是从pipeline的尾部开始向头部找符合要求的handler去执行,在pipeline链中最后一个符合执行要求的是pipeline的head节点,我们看HeadContext的bind方法

            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
                unsafe.bind(localAddress, promise);
            }
    

    又看到了我们熟悉的unsafe,我们看下unsafe.bind源代码

     public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                assertEventLoop();
    
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                // See: https://github.com/netty/netty/issues/576
                if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                    localAddress instanceof InetSocketAddress &&
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                    !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                    // Warn a user about the fact that a non-root user can't receive a
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.
                    logger.warn(
                            "A non-root user can't receive a broadcast packet if the socket " +
                            "is not bound to a wildcard address; binding to a non-wildcard " +
                            "address (" + localAddress + ") anyway as requested.");
                }
    
                boolean wasActive = isActive();
                try {
                   //这个地方是这段代码的核心
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
                //当完成doBind方法后,这个判断条件就会为真,然后就会向NioEventLoop提交一个任务,
              //这个任务的作用是从pipeline的head依次触发pipeline上面所有handler的channelActive方法
             //这里需要注意的是head的channelActive方法,我在下面解析
                if (!wasActive && isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
                }
    
                safeSetSuccess(promise);
            }
    
    doBind

    我们看下NioServerSocketChannel.doBind的具体实现

       protected void doBind(SocketAddress localAddress) throws Exception {
          //针对不同JDK版本,ServerSocketChannel绑定到指定的ip和端口的方式不同
           if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
    
    Head handler channelActive方法

    我看到head handler的channelActive方法会先触发pipeline上面别的handler的channelActive方法,最后它还会执行readIfIsAutoRead方法,这个方法的作用就是将ServerSocketChannel在selector上注册的感兴趣事件修改成OP_ACCEPT

     @Override
            public void channelActive(ChannelHandlerContext ctx) {
                ctx.fireChannelActive();
    
                readIfIsAutoRead();
            }
    

    我看下readIfIsAutoRead()的调用链
    readIfIsAutoRead --> channel.read() --> pipeline.read() --> tail.read();
    这个调用链是不是很熟悉,和上面channel.bind的调用链如出一辙
    我们看下tail.read的源代码

        public ChannelHandlerContext read() {
          //从pipeline中找到符合MASK_READ的OutboundHandler,最后会找到headContext
            final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeRead();
            } else {
                Tasks tasks = next.invokeTasks;
                if (tasks == null) {
                    next.invokeTasks = tasks = new Tasks(next);
                }
                executor.execute(tasks.invokeReadTask);
            }
    
            return this;
        }
    

    我再来分析下headContext.invokeRead()方法调用链
    headContext.invokeRead() --> headHandler.read() -->unsafe.beginRead() -->channel.doBeginRead()
    我看channel.doBeginRead()源代码

    protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
            //在channelActive事件发生之前interestOps为0
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                 //修改selectionKey感兴趣事件为readInterestOp,对NioServerSocketChannel来说readInterestOp为OP_ACCEPT
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    

    到此就完成了整个Server端启动分析

    相关文章

      网友评论

          本文标题:netty Server端启动源码分析

          本文链接:https://www.haomeiwen.com/subject/srlccktx.html