美文网首页
3. Netty-Netty 启动过程

3. Netty-Netty 启动过程

作者: JacobChan001 | 来源:发表于2020-08-05 10:01 被阅读0次

    Netty 的启动流程

    服务端 Nio 创建步骤包括:1. 创建;2. 注册;3. 监听。
    所以需要在 netty 启动也需要完成以上步骤

    创建

    创建步骤分为:创建 selector,创建 ServerSocketChannel

    1. 创建 Selector

    在创建 NioEventLoopGroup 的时候,会对每个 EventLoop 进行初始化,初始化 EventLoop时,也初始化 EventLoop 里的 selector

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    

    NioEventLoopGroup 构造方法调用链

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        // executor 为 null
        // SelectorProvider.provider() 方法使用单例模式创建SelectorProvider
        // SelectorProvider 是后续创建 Selector 的类
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
                int nThreads, Executor executor, final SelectorProvider selectorProvider) {
         // DefaultSelectStrategyFactory.INSTANCE 通过单例模式创建一个 DefaultSelectStrategyFactory
         // DefaultSelectStrategyFactory 的用途是后续创建 SelectStrategy
         // SelectStrategy 用来控制后续 select 轮训的策略
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
        // RejectedExecutionHandlers.reject() 定义了 NioEventLoop 里线程池的拒绝策略
        // super 表示调用 NioEventLoopGroup 的父类构造函数,即 MultithreadEventLoopGroup
        super(nThreads, executor, selectorProvider, 
                  selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // args[0]: selectorProvider
        // args[1]: selectStrategyFactory
        // args[2]: rejectedExecutionHandler
        // super 表示 调用父类构造函数,即 MultithreadEventExecutorGroup
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        // DefaultEventExecutorChooserFactory.INSTANCE 通过单例创建DefaultEventExecutorChooserFactory
        // DefaultEventExecutorChooserFactory 的作用是通过 EventExecutor 数组创建一个 EventExecutorChooser
        // EventExecutorChooser 的作用是通过不同的算法从 EventExecutor 数组 选择一个 EventExecutor
        // NioEventLoop 继承了 EventExecutor,所以在 NioEventLoopGroup 里的 NioEventLoop数组即是EventExecutor数组
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        ...
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
       
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
          try{
             // 创建 NioEventLoop
            // 模板方法,由 NioEventLoopGroup 实现,selector 就在创建 NioEventLoop 时进行创建
             children[i] = newChild(executor, args);
          }
          ...
        }
    }
    
    // NioEventLoopGroup#newChild() 
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        // 创建 NioEventLoop
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
        }
    

    接下来就是 NioEventLoop 的构造函数

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) {
       super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                    rejectedExecutionHandler);
       this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
       this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
       // 创建 selector,并返回一个selector 的元组对象,此元组包含了一个 unwrappedSelector 和 一个 selector 
       // 默认情况下 unwrappedSelector 和 selector  是相同的对象,都是java nio 原生的 selector
      // 只有开启了优化配置,才会创建优化了的 selector
      // 但是即使优化了selector,在注册的时候依然使用 unwrappedSelector 这个原始的 selector
       final SelectorTuple selectorTuple = openSelector();
      
       // selector: 是被 Netty 优化,由 Netty 继承自 selector 接口实现的类
       this.selector = selectorTuple.selector;
       // unwrappedSelector: 没有被 Netty 优化的,原始的JDK Nio selector
       this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
    

    至此,便完成了 selector 的创建

    接下来就是 ServerSocketChannel 的创建

    2. ServerSocketChannel 的创建和注册

    • ServerSocketChannel 是Java 原始的 Channel,在 Netty 中对这个ServerSocketChannel 进行了扩展,使用 NioServerSocketChannel 持有 ServerSocketChannel
    // Java ServerSocketChannel 的在 NioServerSocketChannel 的位置
    // NioServerSocketChannel  -> AbstractNioMessageChannel -> AbstractNioChannel
    // SelectableChannel 就是 java nio ServerSocketChannel 的接口类
    private final SelectableChannel ch;
    
    protected SelectableChannel javaChannel() {
        return ch;
    }
    
    
    • 所以 ServerSocketChannel 的创建过程应该在 NioServerSocketChannel 创建的时候进行的
    • 而 NioServerSocketChannel 则是在 ServerBootstrap中被创建的
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.channel(NioServerSocketChannel.class);
    ...
    bootstrap.bind(8080);
    

    与 NioServerSocketChannel 相关代码有两个

    • bootstrap.channel(NioServerSocketChannel.class)方法会创建一个 channelFactory,channelFactory 是一个 netty channel 的工厂方法,用来创建 NioServerSocketChannel 实例
    // AbstractBootstrap#channel
    public B channel(Class<? extends C> channelClass) {
        // ReflectiveChannelFactory 封装了 channelClass,调用 ReflectiveChannelFactory#newChannel 方法时通过反射的方式实例化channelClass,返回channelClass类的对象实例
        // channelFactory方法则将 ReflectiveChannelFactory 复制给 AbstractBootstrap的成员变量 channleFactory
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
    
    // ReflectiveChannelFactory 构造方法
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        }
        ...
     }
    
    • bootstrap.bind(8080) 用来实际创建 NioServerSocketChannel 对象实例

    进入 bootstrap.bind()方法,该方法实际上进入了 ServerBootstrap 的父类 AbstractBootstrap的 bind 方法

    // AbstractBootstrap#bind(int inetPort)
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    public ChannelFuture bind(SocketAddress localAddress) {
        // 验证 bossGroup 和 channelFactory 是否存在
        validate();
        // 真正进行 bind 的方法
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化 NioServerSocketChannel ,并将 NioServerSocketChannel 里的注册到 seletor 中就在这里
        final ChannelFuture regFuture = initAndRegister();
        ...
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 调用 上面所说的 ReflectiveChannelFactory#newChannel() 方法
            // 初始化并返回 NioServerSocketChannel
            channel = channelFactory.newChannel();
            init(channel);
        }
        ...
        ChannelFuture regFuture = config().group().register(channel);
    }
    //ReflectiveChannelFactory#newChannel()
    @Override
    public T newChannel() {
        try {
            // 相当于 new NioServerSocketChannel();
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
    
    

    接下来就是创建 NioServerSocketChannel 的流程

    // NioServerSocketChannel 的构造函数
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        // 创建有个 java Nio ServerSocketChannel 实例
        return provider.openServerSocketChannel();
        ...
    }
    
    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 调用父类的构造方法,即AbstractNioMessageChannel的构造方法
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // 调用父类的构造方法,即 AbstractNioChannel 的构造方法
        // parent 为 null
        // ch 为 ServerSocketChannel
        // readInterestOp 为 SelectionKey.OP_ACCEPT
        super(parent, ch, readInterestOp);
    }
    
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // parent 为 null, 调用父类构造函数,即 AbstractChannel 的构造函数
        super(parent);
        // 将 serverSocketChannel 赋值给 NioServerSocketChannel 的成员变量 ch
        this.ch = ch;
        // serverSocketChannel 只对 SelectionKey.OP_ACCEPT 感兴趣
        this.readInterestOp = readInterestOp;
        try {
            // 将 serverSocketChannel 设置为非阻塞
            ch.configureBlocking(false);
        }
        ...
    }
    
    protected AbstractChannel(Channel parent) {
        // parent 为 null
        this.parent = parent;
        // NioServerSocketChannel的 id
        id = newId();
        // 最后会调用 AbstractNioByteChannel 的 newUnsafe() 方法,返回 NioMessageUnsafe 对象
        unsafe = newUnsafe();
        // 熟悉的 pipeline,返回 DefaultChannelPipeline 对象
        pipeline = newChannelPipeline();
    }
    

    所以,在创建 NioServerSocketChannel 时的 ServerSocketChannel 便已经被创建了,并赋值给了成员变量 ch

    接下来就是把这个 ServerSocketChannel 注册到 selector 中,回到刚才 AbstractBootstrap 的 initAndRegister 方法

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        // 完成了 ServerSocketChannel 的创建,接下来就是将 ServerSocketChannel 注册到 selector 中
        // config().group() 会返回 bossGroup,即是一个 NioEventLoopGroup 对象register(channel)
        // 调用 config().group().register(channel) 就是调用 NioEventLoopGroup.register(channel)
        // NioEventLoopGroup.register(channel) 方法的实现在其父类 MultithreadEventLoopGroup
        ChannelFuture regFuture = config().group().register(channel);
        
    }
    
    // MultithreadEventLoopGroup#register
    @Override
    public ChannelFuture register(Channel channel) {
        // next() 会使用 chooser(EventExecutorChooser) 选择一个 NioEventLoop
        // 调用 NioEventLoop.register 方法,NioEventLoop.register 方法在 SingleThreadEventLoop.register 中实现
        return next().register(channel);
    }
    
    // SingleThreadEventLoop#register
    @Override
    public ChannelFuture register(Channel channel) {
        // 根据 NioServerSocketChannle 生成一个 DefaultChannelPromise
        return register(new DefaultChannelPromise(channel, this));
    }
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        // promise.channel 返回 NioServerSocketChannel
        // NioServerSocketChannel.unsafe() 返回一个 Unsafe
        // Unsafe.register 方法会进入 NioByteUnsafe.register 方法, 而 NioByteUnsafe.register 方法的实现在 AbstractNioUnsafe
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    
    // AbstractNioUnsafe#register
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        
        ...
        AbstractChannel.this.eventLoop = eventLoop;
        // 判断当前线程是否是在本 eventLoop,显然不是,当前线程是在 main thread,所以不进入
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 不是在 eventLoop中,则创建一个线程,放到 eventLoop中的线程池中,在线程池中调用register0() 方法
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            }
            ...
        }
    }
    
    private void register0(ChannelPromise promise) {
        ...
        // 此方法在 AbstractNioChannel中实现
        doRegister();
        ...
    }
    
    // AbstractNioChannel#doRegister
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // javaChannel() 即是 ServerSocketChannel
                // eventLoop().unwrappedSelector() 即是 selector
                // 0 表示 ServerSocketChannel 对任何事件都不感兴趣,后续还会另外注册 Accept 事件到 selector
                // this:表示把 NioServerSocketChannel 作为属性绑定到 serverSocketChannel 中,只要selector 返回了 serverSocketChannel 相关的事件,便会带回来这个 NioServerSocketChannel 对象
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } 
        }
    }
    

    到目前为止,

    • selector 的创建,
    • serverSocketChannel 的创建,
    • serverSocketChannel 向 selector 注册
      已经完成

    还有

    • 给 serverSocketChannel设置监听端口
    • 注册 serverSocketChannel accept事件 到 selector
    • selector.调用 select() 方法轮询的步骤
      还没完成

    回到 AbstractBootstrap#dobind() 方法

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 上面的步骤创建了 NioServerSocketChannel,由于 NioServerSocketChannel注册到 selector 是放在 eventLoop 的线程池异步进行的,所以会返回一个regFuture 这个异步结果。
        ...
    
      if (regFuture.isDone()) {
           // 如果已经注册完成了NioServerSocketChannel,则开始 bind NioServerSocketChannel
          // 具体的bind 方法在 doBind0() 中
          ChannelPromise promise = channel.newPromise();
          doBind0(regFuture, channel, localAddress, promise);
          return promise;
      }
    }
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
          //拿到 NioServerSocketChannel 所在的 eventLoop,然后将将 bind 方法丢给 eventLoop 里的线程池进行处理
          channel.eventLoop().execute(new Runnable() {
              @Override
              public void run() {
                  if (regFuture.isSuccess()) {
                      // 调用 NioServerSocketChannel 的 bind 方法,此方法的实现在 AbstractChannel 类
                      channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                  } else {
                      promise.setFailure(regFuture.cause());
                  }
              }
          });
      }
    
    //AbstractChannel#bind
    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
          // 调用 DefaultChannelPipeline 的 bind 方法
          return pipeline.bind(localAddress, promise);
    }
    
    // DefaultChannelPipeline#bind
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
          // 调用 TailContext 的 bind 方法,该方法会调用  AbstractChannelHandlerContext.bind() 方法
          // 该方法会去寻找pipeline 中所有的 executionMask 包含了 Mask_Bind 的 Outbound Handler,默认情况下只有 HeadContext 才有,所以实际上调用的是 HeadContext 的 bind 方法
          return tail.bind(localAddress, promise);
      }
    
    
    
    // HeadContext#bind
    public void bind(
              ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
          // 该 unsafe 是 NioServerSocketChannel 中的 NioMessageUnsafe 对象,NioMessageUnsafe.bind 方法的实现在其父类 AbstractUnsafe 中
          unsafe.bind(localAddress, promise);
    }
    
    
    // AbstractUnsafe#bind
    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
          ...
        try {
            // 调用 NioServerSocketChannle 的 doBind 方法
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        ...
     }
    
    // NioServerSocketChannel#Bind
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            // 调用 java nio 的 ServerSocketChannel 的 bind 方法
            // backlog 表示等候排队的连接队列长度
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    

    至此,ServerSocketChannel bind 监听端口也已经完成

    接下来还有

    • 注册 serverSocketChannel accept 事件 到 selector
    • selector.调用 select() 方法轮询的步骤

    回到 AbstractUnsafe.bind 方法

    // AbstractUnsafe#bind
    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
          ...
          if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                  @Override
                  public void run() {
                        // 触发 pipeline 中 ChannelHandler 的 channelActive() 方法
                        // 这里直接到 HeadContext 的 active() 方法
                        pipeline.fireChannelActive();
                    }
                });
            }
     }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
          ctx.fireChannelActive();
          // 注册 Accept 的事件
          readIfIsAutoRead();
    }
    
    private void readIfIsAutoRead() {
          if (channel.config().isAutoRead()) {
              // 调用 NioServerSocketChannel 的 read 方法
              // 该方法的实现在 AbstractChannel 中
              channel.read();
          }
    }
    
    //AbstractChannel#read
    @Override
    public Channel read() {
        // 调用 DefaultChannelPipeline 的 read() 方法
        pipeline.read();
        return this;
    }
    
    //DefaultChannelPipeline#read
    @Override
    public final ChannelPipeline read() {
        // 该方法从 tail 开始,调用所有的 ChannelHandler 的 read 的方法
        // 最终会到达 HeadContext 的 read(ChannelHandlerContext ctx)  方法
        tail.read();
        return this;
    }
    
    // HeadContext
    @Override
    public void read(ChannelHandlerContext ctx) {
        // 调用 NioMessageUnsafe.beginRead() 方法
        // 该方法的实现在 AbstractUnsafe 中
        unsafe.beginRead();
    }
    
    // AbstractUnsafe#beginRead()
    @Override
    public final void beginRead() {
          ...
          try {
              // 调用 AbstractChannel 的 doBeginRead() 方法
              // 该方法的实现在 AbstractNioChannel
              doBeginRead();
          } catch (final Exception e) {
          ...
          }
          ...
    }
    
    // AbstractNioChannel#doBeginRead()
    @Override
    protected void doBeginRead() throws Exception {
        // 这个 selectionKey 是在将 serverSocketChannel 注册到 selector 时返回的
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        // readInterestOp 是在创建NioServerSocketChannel 时指定的,值为 SelectionKey.OP_ACCEPT
        if ((interestOps & readInterestOp) == 0) {
            // 设置对 accept 事件感兴趣
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    完成了 accept 注册后,最后一部就到了select 轮询

    总结


    netty启动过程.png

    相关文章

      网友评论

          本文标题:3. Netty-Netty 启动过程

          本文链接:https://www.haomeiwen.com/subject/bqajrktx.html