美文网首页深入浅出Netty源码剖析
Netty学习 - NioServerSocketChannel

Netty学习 - NioServerSocketChannel

作者: buzzerrookie | 来源:发表于2018-08-03 09:37 被阅读4次

    本文分析NioServerSocketChannel构造函数传入的兴趣集SelectionKey.OP_ACCEPT参数是在何处使用的,以完善对NioEventLoop的分析。

    构造函数

    NioServerSocketChannel类层次结构如下图所示,下面自顶向下分析各类的构造函数。


    NioServerSocketChannel.png
    1. NioServerSocketChannel构造函数如下所示,可见其调用了父类构造函数。
      public NioServerSocketChannel() {
          this(newSocket(DEFAULT_SELECTOR_PROVIDER));
      }
      
      public NioServerSocketChannel(SelectorProvider provider) {
          this(newSocket(provider));
      }
      
      public NioServerSocketChannel(ServerSocketChannel channel) {
          super(null, channel, SelectionKey.OP_ACCEPT);
          config = new NioServerSocketChannelConfig(this, javaChannel().socket());
      }
      
    2. NioServerSocketChannel的父类AbstractNioMessageChannel只有一个构造函数,也是简单地调用了它的父类的构造函数。
      protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
          super(parent, ch, readInterestOp);
      }
      
    3. AbstractNioChannel是AbstractNioMessageChannel的父类,构造函数中为成员变量赋值并将通道设置成非阻塞模式。
      public abstract class AbstractNioChannel extends AbstractChannel {
          private final SelectableChannel ch;
          protected final int readInterestOp;
          volatile SelectionKey selectionKey;
          private ChannelPromise connectPromise;
          private ScheduledFuture<?> connectTimeoutFuture;
          private SocketAddress requestedRemoteAddress;
      
          protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
              super(parent);
              this.ch = ch;
              this.readInterestOp = readInterestOp;
              try {
                  ch.configureBlocking(false);
              } catch (IOException e) {
                  // 省略一些代码
              }
          }
          // 省略一些代码
      }
      
    4. AbstractNioChannel类的构造函数接着调用了其父类AbstractChannel的构造函数,新建了Unsafe实例和流水线。
      protected AbstractChannel(Channel parent) {
          this.parent = parent;
          id = newId();
          unsafe = newUnsafe();
          pipeline = newChannelPipeline();
      }
      

    引导过程

    让我们回顾引导绑定的过程,以下代码为AbstractBootstrap类的doBind方法,该方法主要做了如下工作:

    1. initAndRegister()方法初始化通道,并将通道注册到EventLoopGroup的一个EventLoop上,在这个过程中会调用AbstractChannel的doRegister()抽象方法,NioServerSocketChannel的doRegister()方法定义在AbstractNioChannel类中:
      @Override
      protected void doRegister() throws Exception {
          boolean selected = false;
          for (;;) {
              try {
                  selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                  return;
              } catch (CancelledKeyException e) {
                  if (!selected) {
                      // Force the Selector to select now as the "canceled" SelectionKey may still be
                      // cached and not removed because no Select.select(..) operation was called yet.
                      eventLoop().selectNow();
                      selected = true;
                  } else {
                      // We forced a select operation on the selector before but the SelectionKey is still cached
                      // for whatever reason. JDK bug ?
                      throw e;
                  }
              }
          }
      }
      
    2. 将通道绑定到配置的本地地址上:
      private ChannelFuture doBind(final SocketAddress localAddress) {
          final ChannelFuture regFuture = initAndRegister();
          final Channel channel = regFuture.channel();
          if (regFuture.cause() != null) {
              return regFuture;
          }
      
          if (regFuture.isDone()) {
              // At this point we know that the registration was complete and successful.
              ChannelPromise promise = channel.newPromise();
              doBind0(regFuture, channel, localAddress, promise);
              return promise;
          } else {
              // Registration future is almost always fulfilled already, but just in case it's not.
              final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
              regFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      Throwable cause = future.cause();
                      if (cause != null) {
                          // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                          // IllegalStateException once we try to access the EventLoop of the Channel.
                          promise.setFailure(cause);
                      } else {
                          // Registration was successful, so set the correct executor to use.
                          // See https://github.com/netty/netty/issues/2586
                          promise.registered();
      
                          doBind0(regFuture, channel, localAddress, promise);
                      }
                  }
              });
              return promise;
          }
      }
      
    3. 在AbstractBootstrap绑定本地地址的过程中,if分支表示注册恰好结束马上就调用了doBind0方法,而else分支则是在注册结束后调用doBind0方法。doBind0方法会在与通道绑定的EventLoop中调用Channel的bind方法,接下来就与引导没有直接关系了。
      private static void doBind0(
              final ChannelFuture regFuture, final Channel channel,
              final SocketAddress localAddress, final ChannelPromise promise) {
          // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
          // the pipeline in its channelRegistered() implementation.
          channel.eventLoop().execute(new Runnable() {
              @Override
              public void run() {
                  if (regFuture.isSuccess()) {
                      channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                  } else {
                      promise.setFailure(regFuture.cause());
                  }
              }
          });
      }
      

    为通道绑定本地地址

    上文调用Channel的bind方法实际上调用的是AbstractChannel的bind方法,该方法接着调用了DefaultChannelPipeline的bind方法:

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    

    以下代码为DefaultChannelPipeline的bind方法,可以看到是从流水线的尾节点执行绑定操作。

    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }
    

    流水线的尾节点是TailContext类型,bind方法在它的父类AbstractChannelHandlerContext中定义如下,从前面的文章可知findContextOutbound()是找到该上下文前面的第一个出站处理器(出站的前面是沿着流水线的双向链表往左找),因此final AbstractChannelHandlerContext next = findContextOutbound(); 这一句中的next变量指向了流水线的头节点。

    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
    
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
    

    头节点类型是HeadContext,调用invokeBind方法,其handler()方法调用返回其自己,紧接着调用了HeadContext类的bind方法。

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }
    

    HeadContext类的bind方法如下所示,unsafe引用通道的unsafe。

    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }
    

    NioServerSocketChannel的unsafe是AbstractUnsafe类型,其bind方法如下所示。isActive方法在doBind方法执行之前会返回false,执行之后则会返回true,因此触发通道激活事件。

    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        // 省略一些代码
        boolean wasActive = isActive();
        try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }
    
        safeSetSuccess(promise);
    }
    

    DefaultChannelPipeline的fireChannelActive()方法如下所示,激活事件从头节点开始传播,接着会调用头节点自身的handler()方法返回自己,然后执行HeadContext类的channelActive回调函数。

    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }
    
    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }
    

    头节点的激活事件回调函数如下所示,自动读被默认配置成开启,因此会执行对应通道的读操作。

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead();
    }
    
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    

    AbstractChannel的读方法如下所示,委托给了流水线DefaultChannelPipeline:

    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }
    

    而DefaultChannelPipeline的读操作代码表明读从尾节点开始:

    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }
    

    尾节点的读操作定义在其父类AbstractChannelHandlerContext中,与前文类似,findContextOutbound()往前找到第一个出站处理器,因此next变量指向头节点,头节点调用handler()返回自己,触发读操作read:

    @Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }
        return this;
    }
    
    private void invokeRead() {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }
    

    头节点HeadContext的读操作如下所示,依然是委托给了unsafe:

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }
    

    AbstractUnsafe类的beginRead()方法如下,其中doBeginRead()是AbstractChannel类的抽象方法。

    @Override
    public final void beginRead() {
        assertEventLoop();
        if (!isActive()) {
            return;
        }
        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireExceptionCaught(e);
                }
            });
            close(voidPromise());
        }
    }
    

    NioServerSocketChannel的doBeginRead()方法定义在其父类AbstractNioChannel中,readInterestOp成员变量正是构造函数传入的SelectionKey.OP_ACCEPT,调用带参数的interestOps方法为通道重新设置了兴趣集,即对可接受事件感兴趣。

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    相关文章

      网友评论

        本文标题:Netty学习 - NioServerSocketChannel

        本文链接:https://www.haomeiwen.com/subject/cbzsvftx.html