美文网首页深入浅出Netty源码剖析
Netty学习 - NioServerSocketChannel

Netty学习 - NioServerSocketChannel

作者: buzzerrookie | 来源:发表于2018-08-03 09:37 被阅读4次

本文分析NioServerSocketChannel构造函数传入的兴趣集SelectionKey.OP_ACCEPT参数是在何处使用的,以完善对NioEventLoop的分析。

构造函数

NioServerSocketChannel类层次结构如下图所示,下面自顶向下分析各类的构造函数。


NioServerSocketChannel.png
  1. NioServerSocketChannel构造函数如下所示,可见其调用了父类构造函数。
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    public NioServerSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
  2. NioServerSocketChannel的父类AbstractNioMessageChannel只有一个构造函数,也是简单地调用了它的父类的构造函数。
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
    
  3. AbstractNioChannel是AbstractNioMessageChannel的父类,构造函数中为成员变量赋值并将通道设置成非阻塞模式。
    public abstract class AbstractNioChannel extends AbstractChannel {
        private final SelectableChannel ch;
        protected final int readInterestOp;
        volatile SelectionKey selectionKey;
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
        private SocketAddress requestedRemoteAddress;
    
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                // 省略一些代码
            }
        }
        // 省略一些代码
    }
    
  4. AbstractNioChannel类的构造函数接着调用了其父类AbstractChannel的构造函数,新建了Unsafe实例和流水线。
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    

引导过程

让我们回顾引导绑定的过程,以下代码为AbstractBootstrap类的doBind方法,该方法主要做了如下工作:

  1. initAndRegister()方法初始化通道,并将通道注册到EventLoopGroup的一个EventLoop上,在这个过程中会调用AbstractChannel的doRegister()抽象方法,NioServerSocketChannel的doRegister()方法定义在AbstractNioChannel类中:
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    
  2. 将通道绑定到配置的本地地址上:
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    
  3. 在AbstractBootstrap绑定本地地址的过程中,if分支表示注册恰好结束马上就调用了doBind0方法,而else分支则是在注册结束后调用doBind0方法。doBind0方法会在与通道绑定的EventLoop中调用Channel的bind方法,接下来就与引导没有直接关系了。
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

为通道绑定本地地址

上文调用Channel的bind方法实际上调用的是AbstractChannel的bind方法,该方法接着调用了DefaultChannelPipeline的bind方法:

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

以下代码为DefaultChannelPipeline的bind方法,可以看到是从流水线的尾节点执行绑定操作。

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

流水线的尾节点是TailContext类型,bind方法在它的父类AbstractChannelHandlerContext中定义如下,从前面的文章可知findContextOutbound()是找到该上下文前面的第一个出站处理器(出站的前面是沿着流水线的双向链表往左找),因此final AbstractChannelHandlerContext next = findContextOutbound(); 这一句中的next变量指向了流水线的头节点。

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

头节点类型是HeadContext,调用invokeBind方法,其handler()方法调用返回其自己,紧接着调用了HeadContext类的bind方法。

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

HeadContext类的bind方法如下所示,unsafe引用通道的unsafe。

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

NioServerSocketChannel的unsafe是AbstractUnsafe类型,其bind方法如下所示。isActive方法在doBind方法执行之前会返回false,执行之后则会返回true,因此触发通道激活事件。

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // 省略一些代码
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

DefaultChannelPipeline的fireChannelActive()方法如下所示,激活事件从头节点开始传播,接着会调用头节点自身的handler()方法返回自己,然后执行HeadContext类的channelActive回调函数。

@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelActive();
    }
}

头节点的激活事件回调函数如下所示,自动读被默认配置成开启,因此会执行对应通道的读操作。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
    readIfIsAutoRead();
}

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

AbstractChannel的读方法如下所示,委托给了流水线DefaultChannelPipeline:

@Override
public Channel read() {
    pipeline.read();
    return this;
}

而DefaultChannelPipeline的读操作代码表明读从尾节点开始:

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

尾节点的读操作定义在其父类AbstractChannelHandlerContext中,与前文类似,findContextOutbound()往前找到第一个出站处理器,因此next变量指向头节点,头节点调用handler()返回自己,触发读操作read:

@Override
public ChannelHandlerContext read() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }
    return this;
}

private void invokeRead() {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).read(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        read();
    }
}

头节点HeadContext的读操作如下所示,依然是委托给了unsafe:

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

AbstractUnsafe类的beginRead()方法如下,其中doBeginRead()是AbstractChannel类的抽象方法。

@Override
public final void beginRead() {
    assertEventLoop();
    if (!isActive()) {
        return;
    }
    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

NioServerSocketChannel的doBeginRead()方法定义在其父类AbstractNioChannel中,readInterestOp成员变量正是构造函数传入的SelectionKey.OP_ACCEPT,调用带参数的interestOps方法为通道重新设置了兴趣集,即对可接受事件感兴趣。

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

相关文章

网友评论

    本文标题:Netty学习 - NioServerSocketChannel

    本文链接:https://www.haomeiwen.com/subject/cbzsvftx.html