美文网首页
Netty源码_AbstractNioChannel详解

Netty源码_AbstractNioChannel详解

作者: wo883721 | 来源:发表于2021-11-02 08:02 被阅读0次

一. NioUnsafe 接口

    public interface NioUnsafe extends Unsafe {
        /**
         * Return underlying {@link SelectableChannel}
         * 返回底层的NIO通道
         */
        SelectableChannel ch();

        /**
         * 完成连接;
         *  在 NioEventLoop 的 `processSelectedKey` 方法中调用,
         *  当底层的NIO通道接收到连接事件 OP_CONNECT 时调用
         */
        void finishConnect();

        /**
         * NIO的 SelectableChannel通道中获取到远端的数据
         * 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
         * 当底层的NIO通道接收到读取事件 OP_READ 时调用。
         */
        void read();

        /**
         * 强制刷新;
         * 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
         * 当底层的NIO通道接收到可写事件 OP_WRITE 时调用。
         */
        void forceFlush();
    }

NioUnsafe 接口比 Unsafe 多了四个方法:

  1. SelectableChannel ch() 返回底层的NIO通道
  2. 剩下三个方法都与 NioEventLoop 类中接收到的NIO通道事件有关:
    • finishConnect() 接收到连接事件 OP_CONNECT 时调用。
    • read() 接收到可读事件 OP_READ 时调用。
    • forceFlush() 收到可写事件 OP_WRITE 时调用。

二. AbstractNioUnsafe 类

2.1 实现 NioUnsafe 接口中三个方法

2.1.1 ch()

   // 在 AbstractNioChannel 中的方法
    protected SelectableChannel javaChannel() {
        return ch;
    }

        @Override
        public final SelectableChannel ch() {
            return javaChannel();
        }

2.1.2 finishConnect()

        @Override
        public final void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            // 注意,只有在连接尝试既没有被取消也没有超时时,事件循环才会调用此方法。
            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                // 调用 AbstractNioChannel 的 doFinishConnect 方法进行完成连接操作
                doFinishConnect();
                fulfillConnectPromise(connectPromise, wasActive);
            } catch (Throwable t) {
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {
                // See https://github.com/netty/netty/issues/1770

                // 检查是否为null,因为connectTimeoutFuture仅在超时时间 connectTimeoutMillis > 0时才创建
                // 连接已经完成,取消连接超时任务
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }
  • NIO通道接收到连接事件 OP_CONNECT,表示已经成功建立连接了。
  • 调用 AbstractNioChanneldoFinishConnect 方法进行完成连接操作。
  • 调用 fulfillConnectPromise 方法,完成 ChannelPromise 的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件。
  • 最后因为连接已经完成,就需要取消连接超时任务。

2.1.3 forceFlush()

    @Override
        protected final void flush0() {
            //只有当没有挂起的刷新时才立即刷新。
            //如果有一个挂起的刷新操作,事件循环将在稍后调用forceFlush(),因此不需要现在调用它。
            if (!isFlushPending()) {
                super.flush0();
            }
        }

        @Override
        public final void forceFlush() {
            // 直接调用super.flush0(),强制立即刷新
            super.flush0();
        }

        /**
         * 返回是否准备刷新
         */
        private boolean isFlushPending() {
            SelectionKey selectionKey = selectionKey();
            // 选择键有效,且有可读事件 OP_WRITE时,返回 true
            return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
        }

调用父类 flush0() 方法进行刷新。
还会复写父类的 flush0() 的方法,只有当没有挂起的刷新时才立即刷新。

2.1.4 read()

这个是 AbstractNioUnsafe 中唯一没有实现NioUnsafe的方法,它在 AbstractNioByteChannelAbstractNioMessageChannel 中提供不同的实现,等后面再说。

2.2 实现AbstractUnsafe 中连接方法

      @Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            // 检查通道是否仍然打开
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                // 当 connectPromise 不为空,说明已经有人尝试连接了
                // 防止重复尝试连接
                if (connectPromise != null) {
                    // Already a connect in process.
                    throw new ConnectionPendingException();
                }

                boolean wasActive = isActive();
                // 调用 AbstractNioChannel 的 doConnect 方法进行连接
                if (doConnect(remoteAddress, localAddress)) {
                    //  完成 ChannelPromise 的通知,
                    //  以及是否发送 ChannelActive 事件和 ChannelInactive 事件
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    /**
                     * 因为连接操作是一个异步操作,
                     * 是否连接成功,是由底层 NIO通道接收到连接事件 OP_CONNECT 为准的,
                     * 所以这里要设置一个超时任务,当规定时间内,还没有连接成功,
                     * 那么就要关闭通道和相关的通知操作。
                     *
                     * 还要考虑用户主动取消这次连接请求。
                     */

                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        // 设置一个超时任务,规定时间内,它没有被取消,就会 close(voidPromise()) 关闭通道
                        // 在 finishConnect() 和 doClose() 方法中,会取消这个超时任务
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                if (connectPromise != null && !connectPromise.isDone()
                                        && connectPromise.tryFailure(new ConnectTimeoutException(
                                                "connection timed out: " + remoteAddress))) {
                                    // 连接超时,关闭通道
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            // 用户主动取消这次连接请求, 要取消连接超时任务,以及关闭通道
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }

        /**
         * 完成 ChannelPromise 的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件
         */
        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                return;
            }

            boolean active = isActive();

            // 尝试设置 promise 为成功完成,
            // 如果设置失败,即返回 false,表示用户取消了这次连接请求
            boolean promiseSet = promise.trySuccess();

            // 无论用户是否取消了这次连接请求,
            // 都判断是否发送 ChannelActive 事件
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }

            // 如果用户取消了这次连接请求,
            // 则关闭通道,然后可能会发送 ChannelInactive 事件
            if (!promiseSet) {
                close(voidPromise());
            }
        }

        private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // 使用 tryFailure() 而不是 setFailure(),
            // 来避免与cancel()的竞争。
            promise.tryFailure(cause);
            closeIfClosed();
        }

方法流程:

  1. 调用 AbstractNioChanneldoConnect 方法进行连接。

    如果返回 true ,表示一直阻塞等待连接成功。
    如果返回 false,表示是一个非阻塞连接,需要等待底层 NIO通道接收到连接事件 OP_CONNECT,才代表连接成功。

  2. 阻塞连接成功

    那么就调用 fulfillConnectPromise(...) 方法,完成 ChannelPromise的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件。

  3. 非阻塞连接
    • 需要创建一个超时任务,当规定时间内,还没有连接成功,那么就要关闭通道和相关的通知操作。
    • 再考虑用户主动取消这次连接请求时,要取消连接超时任务,以及关闭通道。

三. AbstractNioChannel 中实现的方法

3.1 EventLoop 的兼容性

    @Override
    protected boolean isCompatible(EventLoop loop) {
        // 与当前通道兼容的事件轮询器必须是 NioEventLoop 的子类
        return loop instanceof NioEventLoop;
    }

AbstractNioChannel匹配的事件轮询器必须是 NioEventLoop 的子类。

3.2 注册

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 通过NIO SelectableChannel 的register方法,
                // 将NIO通道注册到事件轮询器的 Selector 上,
                // 这样就可以监听NIO通道的 IO事件,包括接收,连接,可读,可写。
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

通过NIO SelectableChannelregister方法,将NIO通道注册到事件轮询器的 Selector 上。
这样就可以监听NIO通道的 IO事件,包括接收,连接,可读,可写。

3.3 取消注册

    @Override
    protected void doDeregister() throws Exception {
        // 将通道从已注册的事件轮询器中取消
        eventLoop().cancel(selectionKey());
    }

将通道从已注册的事件轮询器中取消。

3.4 开始读

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() 方法调用,
        // 会调用到这里
        final SelectionKey selectionKey = this.selectionKey;
        // 当前选择键是否有效
        if (!selectionKey.isValid()) {
            return;
        }

        // 设置当前通道是 可读状态
        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 设置底层NIO通道读事件 OP_READ 或 OP_ACCEPT
         * 与 AbstractNioUnsafe 的 removeReadOp() 方法正好相反。
         */
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

就是设置底层NIO通道读事件。

3.5 关闭

   @Override
    protected void doClose() throws Exception {
        ChannelPromise promise = connectPromise;
        if (promise != null) {
            // 使用tryFailure()而不是setFailure() 方法,
            // 来避免与取消 cancel()的竞争。
            promise.tryFailure(new ClosedChannelException());
            connectPromise = null;
        }

        // 关闭操作时,需要取消连接超时任务
        Future<?> future = connectTimeoutFuture;
        if (future != null) {
            future.cancel(false);
            connectTimeoutFuture = null;
        }
    }

注意这个方法,没有调用底层NIO通道的关闭close方法;也就是说子类一般都需要复写它。

3.6 小结

AbstractNioChannel 没有实现写操作相关的方法,以及连接操作相关方法 doConnect(...)doFinishConnect()

四. 读数据操作

AbstractNioByteChannelAbstractNioMessageChannel 类中,实现了两种方式的读数据操作。

4.1 AbstractNioByteChannel 中读数据

        @Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // 通过 allocHandle,在接收数据时分配缓存区 ByteBuf
                    byteBuf = allocHandle.allocate(allocator);
                    // 通过 doReadBytes(byteBuf) 方法,从底层 NIO 通道中读取数据到 ByteBuf 中,
                    // 并返回读取数据的大小;
                    // 通过 lastBytesRead 方法记录上次读操作已读取的字节。
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        // 没有可读数据了;释放缓冲区。
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            readPending = false;
                        }
                        break;
                    }

                    // 增加当前读循环中已读消息的数量
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    // 通过管道 pipeline 发送 ChannelRead 读取事件
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    // 通过allocHandle.continueReading()方法,
                    // 判断是否需要继续读取。
                } while (allocHandle.continueReading());

                // 这次读取已完成
                allocHandle.readComplete();
                // 通过管道 pipeline 发送 ChannelReadComplete 读取完成事件
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
  • 通过 doReadBytes(byteBuf) 方法,从底层NIO 通道中读取数据到输入缓冲区ByteBuf 中。
  • 通过 pipeline.fireChannelRead(...) 方法,发送ChannelRead读取事件。
  • 通过 allocHandle.continueReading() 判断是否需要继续读取。
  • 这次读取完成,调用 pipeline.fireChannelReadComplete() 方法,发送 ChannelReadComplete 读取完成事件。

4.2 AbstractNioMessageChannel 中读数据

      @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // 将消息读入给定数组并返回所读入的数量
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            // 小于 0,表示已经关闭
                            closed = true;
                            break;
                        }
                        // 增加当前读循环中已读消息的数量
                        allocHandle.incMessagesRead(localRead);
                        // 判断是否需要继续读取
                    } while (continueReading(allocHandle));
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                // 遍历读取消息的数组readBuf
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 通过管道 pipeline 发送 ChannelRead 读取事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                // 这次读取已完成
                allocHandle.readComplete();
                // 通过管道 pipeline 发送 ChannelReadComplete 读取完成事件
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
  • 使用 readBuf 数组,一次读取操作所有的数据对象。
  • 通过 doReadMessages(readBuf) 方法,将消息读入给定数组 readBuf,并返回所读入的数量localRead
  • 通过 localRead 的值,判断是否读取完成,或者通道已经关闭。
  • 通过 continueReading(allocHandle) 方法,判断是否需要继续读取。
  • 遍历读取消息的数组readBuf, 通过管道 pipeline 发送 ChannelRead 读取事件;遍历完成,通过管道 pipeline 发送 ChannelReadComplete 读取完成事件。

相关文章

网友评论

      本文标题:Netty源码_AbstractNioChannel详解

      本文链接:https://www.haomeiwen.com/subject/tkmzaltx.html