美文网首页
3. Netty-Netty 启动过程

3. Netty-Netty 启动过程

作者: JacobChan001 | 来源:发表于2020-08-05 10:01 被阅读0次

Netty 的启动流程

服务端 Nio 创建步骤包括:1. 创建;2. 注册;3. 监听。
所以需要在 netty 启动也需要完成以上步骤

创建

创建步骤分为:创建 selector,创建 ServerSocketChannel

1. 创建 Selector

在创建 NioEventLoopGroup 的时候,会对每个 EventLoop 进行初始化,初始化 EventLoop时,也初始化 EventLoop 里的 selector

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

NioEventLoopGroup 构造方法调用链

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, Executor executor) {
    // executor 为 null
    // SelectorProvider.provider() 方法使用单例模式创建SelectorProvider
    // SelectorProvider 是后续创建 Selector 的类
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
     // DefaultSelectStrategyFactory.INSTANCE 通过单例模式创建一个 DefaultSelectStrategyFactory
     // DefaultSelectStrategyFactory 的用途是后续创建 SelectStrategy
     // SelectStrategy 用来控制后续 select 轮训的策略
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    // RejectedExecutionHandlers.reject() 定义了 NioEventLoop 里线程池的拒绝策略
    // super 表示调用 NioEventLoopGroup 的父类构造函数,即 MultithreadEventLoopGroup
    super(nThreads, executor, selectorProvider, 
              selectStrategyFactory, RejectedExecutionHandlers.reject());
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // args[0]: selectorProvider
    // args[1]: selectStrategyFactory
    // args[2]: rejectedExecutionHandler
    // super 表示 调用父类构造函数,即 MultithreadEventExecutorGroup
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    // DefaultEventExecutorChooserFactory.INSTANCE 通过单例创建DefaultEventExecutorChooserFactory
    // DefaultEventExecutorChooserFactory 的作用是通过 EventExecutor 数组创建一个 EventExecutorChooser
    // EventExecutorChooser 的作用是通过不同的算法从 EventExecutor 数组 选择一个 EventExecutor
    // NioEventLoop 继承了 EventExecutor,所以在 NioEventLoopGroup 里的 NioEventLoop数组即是EventExecutor数组
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    ...
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
   
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
      try{
         // 创建 NioEventLoop
        // 模板方法,由 NioEventLoopGroup 实现,selector 就在创建 NioEventLoop 时进行创建
         children[i] = newChild(executor, args);
      }
      ...
    }
}

// NioEventLoopGroup#newChild() 
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    // 创建 NioEventLoop
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

接下来就是 NioEventLoop 的构造函数

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
   super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
   this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
   this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
   // 创建 selector,并返回一个selector 的元组对象,此元组包含了一个 unwrappedSelector 和 一个 selector 
   // 默认情况下 unwrappedSelector 和 selector  是相同的对象,都是java nio 原生的 selector
  // 只有开启了优化配置,才会创建优化了的 selector
  // 但是即使优化了selector,在注册的时候依然使用 unwrappedSelector 这个原始的 selector
   final SelectorTuple selectorTuple = openSelector();
  
   // selector: 是被 Netty 优化,由 Netty 继承自 selector 接口实现的类
   this.selector = selectorTuple.selector;
   // unwrappedSelector: 没有被 Netty 优化的,原始的JDK Nio selector
   this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

至此,便完成了 selector 的创建

接下来就是 ServerSocketChannel 的创建

2. ServerSocketChannel 的创建和注册

  • ServerSocketChannel 是Java 原始的 Channel,在 Netty 中对这个ServerSocketChannel 进行了扩展,使用 NioServerSocketChannel 持有 ServerSocketChannel
// Java ServerSocketChannel 的在 NioServerSocketChannel 的位置
// NioServerSocketChannel  -> AbstractNioMessageChannel -> AbstractNioChannel
// SelectableChannel 就是 java nio ServerSocketChannel 的接口类
private final SelectableChannel ch;

protected SelectableChannel javaChannel() {
    return ch;
}

  • 所以 ServerSocketChannel 的创建过程应该在 NioServerSocketChannel 创建的时候进行的
  • 而 NioServerSocketChannel 则是在 ServerBootstrap中被创建的
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
...
bootstrap.bind(8080);

与 NioServerSocketChannel 相关代码有两个

  • bootstrap.channel(NioServerSocketChannel.class)方法会创建一个 channelFactory,channelFactory 是一个 netty channel 的工厂方法,用来创建 NioServerSocketChannel 实例
// AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
    // ReflectiveChannelFactory 封装了 channelClass,调用 ReflectiveChannelFactory#newChannel 方法时通过反射的方式实例化channelClass,返回channelClass类的对象实例
    // channelFactory方法则将 ReflectiveChannelFactory 复制给 AbstractBootstrap的成员变量 channleFactory
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

// ReflectiveChannelFactory 构造方法
public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        this.constructor = clazz.getConstructor();
    }
    ...
 }
  • bootstrap.bind(8080) 用来实际创建 NioServerSocketChannel 对象实例

进入 bootstrap.bind()方法,该方法实际上进入了 ServerBootstrap 的父类 AbstractBootstrap的 bind 方法

// AbstractBootstrap#bind(int inetPort)
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
    // 验证 bossGroup 和 channelFactory 是否存在
    validate();
    // 真正进行 bind 的方法
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化 NioServerSocketChannel ,并将 NioServerSocketChannel 里的注册到 seletor 中就在这里
    final ChannelFuture regFuture = initAndRegister();
    ...
}

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 调用 上面所说的 ReflectiveChannelFactory#newChannel() 方法
        // 初始化并返回 NioServerSocketChannel
        channel = channelFactory.newChannel();
        init(channel);
    }
    ...
    ChannelFuture regFuture = config().group().register(channel);
}
//ReflectiveChannelFactory#newChannel()
@Override
public T newChannel() {
    try {
        // 相当于 new NioServerSocketChannel();
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

接下来就是创建 NioServerSocketChannel 的流程

// NioServerSocketChannel 的构造函数
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    // 创建有个 java Nio ServerSocketChannel 实例
    return provider.openServerSocketChannel();
    ...
}

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 调用父类的构造方法,即AbstractNioMessageChannel的构造方法
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // 调用父类的构造方法,即 AbstractNioChannel 的构造方法
    // parent 为 null
    // ch 为 ServerSocketChannel
    // readInterestOp 为 SelectionKey.OP_ACCEPT
    super(parent, ch, readInterestOp);
}


protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    // parent 为 null, 调用父类构造函数,即 AbstractChannel 的构造函数
    super(parent);
    // 将 serverSocketChannel 赋值给 NioServerSocketChannel 的成员变量 ch
    this.ch = ch;
    // serverSocketChannel 只对 SelectionKey.OP_ACCEPT 感兴趣
    this.readInterestOp = readInterestOp;
    try {
        // 将 serverSocketChannel 设置为非阻塞
        ch.configureBlocking(false);
    }
    ...
}

protected AbstractChannel(Channel parent) {
    // parent 为 null
    this.parent = parent;
    // NioServerSocketChannel的 id
    id = newId();
    // 最后会调用 AbstractNioByteChannel 的 newUnsafe() 方法,返回 NioMessageUnsafe 对象
    unsafe = newUnsafe();
    // 熟悉的 pipeline,返回 DefaultChannelPipeline 对象
    pipeline = newChannelPipeline();
}

所以,在创建 NioServerSocketChannel 时的 ServerSocketChannel 便已经被创建了,并赋值给了成员变量 ch

接下来就是把这个 ServerSocketChannel 注册到 selector 中,回到刚才 AbstractBootstrap 的 initAndRegister 方法

final ChannelFuture initAndRegister() {
    Channel channel = null;
    // 完成了 ServerSocketChannel 的创建,接下来就是将 ServerSocketChannel 注册到 selector 中
    // config().group() 会返回 bossGroup,即是一个 NioEventLoopGroup 对象register(channel)
    // 调用 config().group().register(channel) 就是调用 NioEventLoopGroup.register(channel)
    // NioEventLoopGroup.register(channel) 方法的实现在其父类 MultithreadEventLoopGroup
    ChannelFuture regFuture = config().group().register(channel);
    
}

// MultithreadEventLoopGroup#register
@Override
public ChannelFuture register(Channel channel) {
    // next() 会使用 chooser(EventExecutorChooser) 选择一个 NioEventLoop
    // 调用 NioEventLoop.register 方法,NioEventLoop.register 方法在 SingleThreadEventLoop.register 中实现
    return next().register(channel);
}

// SingleThreadEventLoop#register
@Override
public ChannelFuture register(Channel channel) {
    // 根据 NioServerSocketChannle 生成一个 DefaultChannelPromise
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    // promise.channel 返回 NioServerSocketChannel
    // NioServerSocketChannel.unsafe() 返回一个 Unsafe
    // Unsafe.register 方法会进入 NioByteUnsafe.register 方法, 而 NioByteUnsafe.register 方法的实现在 AbstractNioUnsafe
    promise.channel().unsafe().register(this, promise);
    return promise;
}

// AbstractNioUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    
    ...
    AbstractChannel.this.eventLoop = eventLoop;
    // 判断当前线程是否是在本 eventLoop,显然不是,当前线程是在 main thread,所以不进入
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 不是在 eventLoop中,则创建一个线程,放到 eventLoop中的线程池中,在线程池中调用register0() 方法
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        }
        ...
    }
}

private void register0(ChannelPromise promise) {
    ...
    // 此方法在 AbstractNioChannel中实现
    doRegister();
    ...
}

// AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // javaChannel() 即是 ServerSocketChannel
            // eventLoop().unwrappedSelector() 即是 selector
            // 0 表示 ServerSocketChannel 对任何事件都不感兴趣,后续还会另外注册 Accept 事件到 selector
            // this:表示把 NioServerSocketChannel 作为属性绑定到 serverSocketChannel 中,只要selector 返回了 serverSocketChannel 相关的事件,便会带回来这个 NioServerSocketChannel 对象
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } 
    }
}

到目前为止,

  • selector 的创建,
  • serverSocketChannel 的创建,
  • serverSocketChannel 向 selector 注册
    已经完成

还有

  • 给 serverSocketChannel设置监听端口
  • 注册 serverSocketChannel accept事件 到 selector
  • selector.调用 select() 方法轮询的步骤
    还没完成

回到 AbstractBootstrap#dobind() 方法

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 上面的步骤创建了 NioServerSocketChannel,由于 NioServerSocketChannel注册到 selector 是放在 eventLoop 的线程池异步进行的,所以会返回一个regFuture 这个异步结果。
    ...

  if (regFuture.isDone()) {
       // 如果已经注册完成了NioServerSocketChannel,则开始 bind NioServerSocketChannel
      // 具体的bind 方法在 doBind0() 中
      ChannelPromise promise = channel.newPromise();
      doBind0(regFuture, channel, localAddress, promise);
      return promise;
  }
}
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
      //拿到 NioServerSocketChannel 所在的 eventLoop,然后将将 bind 方法丢给 eventLoop 里的线程池进行处理
      channel.eventLoop().execute(new Runnable() {
          @Override
          public void run() {
              if (regFuture.isSuccess()) {
                  // 调用 NioServerSocketChannel 的 bind 方法,此方法的实现在 AbstractChannel 类
                  channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
              } else {
                  promise.setFailure(regFuture.cause());
              }
          }
      });
  }

//AbstractChannel#bind
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
      // 调用 DefaultChannelPipeline 的 bind 方法
      return pipeline.bind(localAddress, promise);
}

// DefaultChannelPipeline#bind
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
      // 调用 TailContext 的 bind 方法,该方法会调用  AbstractChannelHandlerContext.bind() 方法
      // 该方法会去寻找pipeline 中所有的 executionMask 包含了 Mask_Bind 的 Outbound Handler,默认情况下只有 HeadContext 才有,所以实际上调用的是 HeadContext 的 bind 方法
      return tail.bind(localAddress, promise);
  }



// HeadContext#bind
public void bind(
          ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
      // 该 unsafe 是 NioServerSocketChannel 中的 NioMessageUnsafe 对象,NioMessageUnsafe.bind 方法的实现在其父类 AbstractUnsafe 中
      unsafe.bind(localAddress, promise);
}


// AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
      ...
    try {
        // 调用 NioServerSocketChannle 的 doBind 方法
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    ...
 }

// NioServerSocketChannel#Bind
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        // 调用 java nio 的 ServerSocketChannel 的 bind 方法
        // backlog 表示等候排队的连接队列长度
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

至此,ServerSocketChannel bind 监听端口也已经完成

接下来还有

  • 注册 serverSocketChannel accept 事件 到 selector
  • selector.调用 select() 方法轮询的步骤

回到 AbstractUnsafe.bind 方法

// AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
      ...
      if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
              @Override
              public void run() {
                    // 触发 pipeline 中 ChannelHandler 的 channelActive() 方法
                    // 这里直接到 HeadContext 的 active() 方法
                    pipeline.fireChannelActive();
                }
            });
        }
 }

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
      ctx.fireChannelActive();
      // 注册 Accept 的事件
      readIfIsAutoRead();
}

private void readIfIsAutoRead() {
      if (channel.config().isAutoRead()) {
          // 调用 NioServerSocketChannel 的 read 方法
          // 该方法的实现在 AbstractChannel 中
          channel.read();
      }
}

//AbstractChannel#read
@Override
public Channel read() {
    // 调用 DefaultChannelPipeline 的 read() 方法
    pipeline.read();
    return this;
}

//DefaultChannelPipeline#read
@Override
public final ChannelPipeline read() {
    // 该方法从 tail 开始,调用所有的 ChannelHandler 的 read 的方法
    // 最终会到达 HeadContext 的 read(ChannelHandlerContext ctx)  方法
    tail.read();
    return this;
}

// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
    // 调用 NioMessageUnsafe.beginRead() 方法
    // 该方法的实现在 AbstractUnsafe 中
    unsafe.beginRead();
}

// AbstractUnsafe#beginRead()
@Override
public final void beginRead() {
      ...
      try {
          // 调用 AbstractChannel 的 doBeginRead() 方法
          // 该方法的实现在 AbstractNioChannel
          doBeginRead();
      } catch (final Exception e) {
      ...
      }
      ...
}

// AbstractNioChannel#doBeginRead()
@Override
protected void doBeginRead() throws Exception {
    // 这个 selectionKey 是在将 serverSocketChannel 注册到 selector 时返回的
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // readInterestOp 是在创建NioServerSocketChannel 时指定的,值为 SelectionKey.OP_ACCEPT
    if ((interestOps & readInterestOp) == 0) {
        // 设置对 accept 事件感兴趣
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

完成了 accept 注册后,最后一部就到了select 轮询

总结


netty启动过程.png

相关文章

网友评论

      本文标题:3. Netty-Netty 启动过程

      本文链接:https://www.haomeiwen.com/subject/bqajrktx.html