美文网首页bugstac...
Netty源码分析-05 Netty服务器启动过程

Netty源码分析-05 Netty服务器启动过程

作者: 史圣杰 | 来源:发表于2019-03-26 17:18 被阅读0次

    服务器的启动过程大量使用了EventLoop和Future/Promise,在阅读源码之前,建议首先要对Netty的这两种机制进行了解。由于Netty更多是在服务器端使用,因此以服务器的启动过程为例进行学习。

    5.1 阶段:配置config

    配置阶段的工作很简单,主要就是初始化启动类,设置相关参数。
    Bootstrap启动类主要功能是初始化启动器,为启动器设置相关属性。我们先来看一下Bootstrap的类结构,启动类有一个AbstractBootstrap基类,有两个实现类Bootstrap和ServerBootstrap,分别用于客户端和服务器的启动。

    AbstractBootstrap

    属性

    EventLoopGroup group; //线程组,对于ServerBootstrap来说,group为ServerSocketChannel服务
    ChannelFactory<? extends C> channelFactory; //用于获取channel的工厂类
    SocketAddress localAddress;//绑定的地址
    Map<ChannelOption<?>, Object> options;//channel可设置的选项,包含java-channel和netty-channel
    Map<AttributeKey<?>, Object> attrs;//channel属性,便于保存用户自定义数据
    ChannelHandler handler;//Channel处理器
    

    方法

    group() //设置线程组
    channelFactory()及channel() 设置channel工厂和channel类型
    localAddress() 设置地址
    option() 添加channel选项
    attr() 添加属性
    handler() 设置channelHander
    

    上面这些方法主要用于设置启动器的相关参数,除此之外,还有一些启动时调用的方法

    register() 内部调用initAndRegister() 用来初始化channel并注册到线程组
    bind() 首先会调用initAndRegister(),之后绑定IP地址,使用Promise保证先initAndRegister()在bind()
    initAndRegister(),主要是创建netty的channel,设置options和attrs,注册到线程组
    

    ServerBootstrap

    ServerBootstrap在AbstractBootstrap的基础上添加了如下属性,用来设置子Channel,也就是客户端连接后创建的Channel的属性。另外,还实现了抽象类中定义的init()方法。

     Map<ChannelOption<?>, Object> childOptions  子channel的配置
     Map<AttributeKey<?>, Object> childAttrs  子channel的属性
    EventLoopGroup childGroup;    处理子channel的事件循环组
    ChannelHandler childHandler;  处理子channel事件的handler
    

    5.2 阶段:初始化init

    初始化init阶段的主要功能是:创建并初始化服务器的Netty-Channel;分为两个步骤:创建和初始化。

    创建NettyChannel

    • 使用SelectorProvider打开java通道
    • 为Channel分配全局唯一的ChannelID
    • 创建NioMessageUnsafe,用于netty底层的读写操作
    • 创建ChannelPipeline,默认的是DefaultChannelPipeline

    下面是初始init阶段的主要代码:

    Channel channel = null;
            try {
                channel = channelFactory.newChannel();// 创建NettyChannel
                init(channel);//初始化NettyChannel
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    

    channelFactory用于获取Channel实例,启动时,channelFactory在调用channel(NioServerSocketChannel.class)设置channel类型时创建,由于我们使用的是设置class的方法,会使用ReflectiveChannelFactory作为工厂类,其会直接调用class的newInstance获取Channel实例。Netty中,服务器端的Channel为NioServerSocketChannel,客户端为NioSocketChannel。

    Channel的创建过程如下:

    1. 打开java通道:NioServerSocketChannel创建时,首先使用SelectorProvider的openServerSocketChannel打开服务器套接字通道。SelectorProvider是Java的NIO提供的抽象类,是选择器和可选择通道的服务提供者。具体的实现类有SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。选择器的主要工作是根据操作系统类型和版本选择合适的Provider:如果LInux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider。至此,底层的Java ServerSocketChannel创建完毕。
    public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
     private static ServerSocketChannel newSocket(SelectorProvider provider) {
            return provider.openServerSocketChannel();
        }
    
    1. Java ServerSocketChannel创建完毕后,会进入netty-Channel的构造方法,首先初始化ChannelId,ChannelId是一个全局唯一的值;
    2. 之后,创建NioMessageUnsafe实例,该类为Channel提供了用于完成网络通讯相关的底层操作,如connect(),read(),register(),bind(),close()等;
    3. 为Channel创建DefaultChannelPipeline,初始化双向链表;
    4. 讲java-channel设置为非阻塞,将关注的操作设置为SelectionKey.OP_ACCEPT(服务器)
     protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            // 初始化双向链表
            tail = new TailContext(this); // 创建head 
            head = new HeadContext(this); // 创建tail
    
            head.next = tail;
            tail.prev = head;
        }
    

    初始化NettyChannel

    创建NettyChannel后,下一步需要进行初始化,由于服务器端和客户端的Channel不一样,因此init方法被分别实现到了ServerBootstrap和Bootstrap中,我们主要分析服务器的init。服务器的init分为几个步骤:

    • 将启动器设置的选项和属性设置到NettyChannel上面
    • 向Pipeline添加初始化Handler,供注册后使用

    具体实现在ServerBootstrap类的init方法中,程序比较简单。每个NettyChannel对象保护一个ChannelConfig类保存相关配置,还有Map<AttributeKey<?>, Object> attrs用来保存自定义属性。至于初始化Handler,我们先记住,在bind中会说明其作用。

    在addLast时,由于还未注册,因此会加入到Pipeline的一个等待链表中,待注册后执行。

    if (!registered) {
        newCtx.setAddPending();
        callHandlerCallbackLater(newCtx, true);
        return this;
    }
    

    总结一下,这个阶段的代码我们可以看出,channel内部包含几个重要对象:

    ChannelID 全局唯一ID
    ChannelConfig  保存配置
    ChannelPipeline  通道的流水线
    Unsafe  Netty底层封装的网络I/O操作
    

    5.3 阶段:注册register

    这个阶段的主要工作是将创建并初始化后的NettyChannel注册到selector上面。具体过程:

    • 将打开NettyChannel注册到线程池组的selector上;
    • 触发Pipeline上面ChannelHandler的channelRegistered,
    // AbstractBootstrap类 initAndRegister()
     ChannelFuture regFuture = config().group().register(channel);
    

    上面的程序会使用传入的线程池组的register(channel);注册NettyChannel,具体方法定义在SingleThreadEventLoop中,其会使用NettyChannel的unsafe的register方法,该方法首先会判断当前线程是否是指定线程池正在运行的线程,如果不是提交到要注册的线程池中执行。执行时调用下面的程序。

    // AbstractUnsafe,删去了部分校验代码
    private void register0(ChannelPromise promise) {
                try {
                    boolean firstRegistration = neverRegistered;// 是否为首次注册
                    doRegister(); //  1. 注册
                    neverRegistered = false;
                    registered = true;
                    pipeline.invokeHandlerAddedIfNeeded();// 2. 将注册之前加入的handler加入进来
                    safeSetSuccess(promise); // 注册成功,通知promise
                    pipeline.fireChannelRegistered();// 4. Pipeline通知触发注册成功
          
                    if (isActive()) { // 是否已经绑定 因为register和bind阶段是异步的
                        if (firstRegistration) { 
                            pipeline.fireChannelActive(); // 5.首次注册,通知
                        } else if (config().isAutoRead()) {// Channel会deregister后重新注册到线程组时,且配置了AutoRead
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    1. 注册:将NettyChannel内部的javaChannel注册到线程池的selector上面,由线程池不断执行select()查询准备就绪的文件描述符。具体实现在AbstractNioChannel中的doRegister()
    2. invokeHandlerAddedIfNeeded: 注册成功后,找到初始化阶段通过pipeline.addLast()加入的ChannelInitializer,执行其ChannelInitializer的initChannel方法,之后将其删除(在ChannelInitializer的initChannel方法中);初始化NettyChannel阶段,我们addLast了一个初始化Handler,现在来看看其作用
    // init初始化阶段添加了一个ChannelInitializer
     p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();// 获取config时设置的handler
                    if (handler != null) {
                        pipeline.addLast(handler); // 将其添加到链表尾部
                    }
                     // 加入一个ServerBootstrapAcceptor处理器,用于处理Accept
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
    

    从上面的程序中可以看到,由于初始化时,还未将javaChannel注册到线程池的selector上,此时还无法设置Channel将Accept注册到选择器上,因此先加入了一个ChannelInitializer,等待register后向Pipeline加入ServerBootstrapAcceptor。此时,NettyChannel的Pipeline的链表结构为:

    Head <-->  InitialHandler  <--> ServerBootstrapAcceptor <--> Tail
    

    在initChannel执行的最后会将InitialHandler从Pipeline移除,此时NioServerSocketChannel的链表结构为

    Head   <--> ServerBootstrapAcceptor <--> Tail
    
    1. fireChannelRegistered,沿着pipeline的head到tail,调用ChannelHandler的channelRegistered方法,
    public final ChannelPipeline fireChannelRegistered() {
            AbstractChannelHandlerContext.invokeChannelRegistered(head);
            return this;
        }
        
    private void invokeChannelRegistered() {
            if (invokeHandler()) { // 状态是否正确
                try {
                    ((ChannelInboundHandler) handler()).channelRegistered(this); // 触发
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRegistered();// 状态不正确,通知下一个Handler
            }
        }
    
    1. fireChannelActive 由于注册阶段和绑定bind阶段都是异步的,如果此时注册完成时bind阶段已经绑定了本地端口,会沿着pipeline的head到tail,调用各个Handler的channelActive方法

    5.4 阶段:绑定bind

    本阶段的主要内容是:将NettyChannel内部的java的ServerSocketChannel绑定到本地的端口上面,结束后使用fireChannelActive通知Pipeline里的ChannelHandle,执行其channelActive方法。

    bind的入口为AbstractBootstrap的doBind0(),内部会调用pipeline中的bind方法,逻辑为从tail出发,调用outbound的ChannelHandler的bind方法,从上面我们可以看到当前的链表如下:

    Head[I/O] <-->  ServerBootstrapAcceptor[IN] <--> Tail[IN]
    

    只有Head可以用来处理Outbound,Head的bind方法调用了channel创建过程中生成的unsafe对象NioMessageUnsafe的实例,该实例的bind方法首先java的channel bind本地地址,然后触发fireChannelActive。

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                boolean wasActive = isActive();
                try {
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    
                if (!wasActive && isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
                }
                safeSetSuccess(promise);
            }
    

    至此,Netty的服务器段已经启动,Channel和ChannelPipeline已经建立。EventLoop也在不断的select()查找准备好的I/O。

    相关文章

      网友评论

        本文标题:Netty源码分析-05 Netty服务器启动过程

        本文链接:https://www.haomeiwen.com/subject/wyrnvqtx.html