美文网首页
Netty源码_AbstractChannel和ChannelO

Netty源码_AbstractChannel和ChannelO

作者: wo883721 | 来源:发表于2021-11-02 08:02 被阅读0次

    一. AbstractChannel

    1.1 构造方法

        /**
         * 创建一个新实例。
         */
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            // 创建
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    
        /**
         * 创建一个新实例。
         */
        protected AbstractChannel(Channel parent, ChannelId id) {
            this.parent = parent;
            this.id = id;
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    可以看出在构造方法中,就绑定了这个通道的四个成员变量 parent,id,unsafe,pipeline

        protected ChannelId newId() {
            return DefaultChannelId.newInstance();
        }
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }
        /**
         * 由子类来实现,创建对应的 Unsafe 类型实例
         */
        protected abstract AbstractUnsafe newUnsafe();
    
    • idpipeline都是直接创建,默认是 DefaultChannelIdDefaultChannelPipeline 类型。
    • newUnsafe() 是抽样方法,有子类才能创建对应的 Unsafe 类型实例。

    1.2 ChannelOutboundInvoker 接口方法

    Channel 还继承了 ChannelOutboundInvoker 接口,也就是说通道是可以发送出站 IO 操作的。

     @Override
        public ChannelFuture bind(SocketAddress localAddress) {
            return pipeline.bind(localAddress);
        }
    
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress) {
            return pipeline.connect(remoteAddress);
        }
    
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
            return pipeline.connect(remoteAddress, localAddress);
        }
    
        @Override
        public ChannelFuture disconnect() {
            return pipeline.disconnect();
        }
    
        @Override
        public ChannelFuture close() {
            return pipeline.close();
        }
    
        @Override
        public ChannelFuture deregister() {
            return pipeline.deregister();
        }
      @Override
        public Channel flush() {
            pipeline.flush();
            return this;
        }
    
        @Override
        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return pipeline.bind(localAddress, promise);
        }
        ........
    

    你会发现基本上都是调用 ChannelPipeline 对应的方法。

    • 也就是说直接调用通道Channel 的发送出站IO事件的方法,和调用管道pipeline() 发送出站IO事件的方法是一样的。
    • 根据 DefaultChannelPipeline 的分析,我们知道这些出站 IO 事件最后都会调用到该通道的 Unsafe 属性对应方法进行处理。

    1.3 抽样方法

    AbstractChannel 还有几个需要子类实现抽样方法,由子类提供不同的处理逻辑:

    1. AbstractUnsafe newUnsafe()

      不同类型的 Channel有自己特定的 Unsafe 类型。

    2. boolean isCompatible(EventLoop loop)

      判断给定的事件轮询器 EventLoop 和当前的通道类型是不是兼容。每种类型的通道Channel 都有自己特定的事件轮询器。

    3. SocketAddress localAddress0()SocketAddress remoteAddress0()

      通道绑定的本地地址和通道连接的远程地址。

    4. void doBind(SocketAddress localAddress)

      进行绑定操作,每种类型的通道绑定处理是不一样的。

    5. void doDisconnect()

      进行连接操作。

    6. void doClose()

      进行关闭连接操作。

    7. void doBeginRead()

      将通道设为开始读操作。

    8. void doWrite(ChannelOutboundBuffer in)

      进行写操作。

    AbstractChannel 真正重点的操作都是在 AbstractUnsafe 中实现的啊,下面讲解 AbstractUnsafe

    二. AbstractUnsafe 类

    2.1 成员属性

            // 写缓冲区 ChannelOutboundBuffer
            private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
    
            // 用于在接收数据时分配缓存区 ByteBuf
            private RecvByteBufAllocator.Handle recvHandle;
    
            // 当前是否正在刷新数据,防止重复刷新数据
            private boolean inFlush0;
    
            // 如果通道从未被注册,则为true,否则为false
            private boolean neverRegistered = true;
    

    2.2 注册 register

            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    
                if (isRegistered()) {
                    // 当前通道已经注册,失败,调用 promise 的setFailure方法进行通知
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    // 这个事件轮询器和当前通道不兼容,失败,调用 promise 的setFailure方法进行通知
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    // 当前线程就是通道 事件轮询器线程,直接调用 register0 方法
                    register0(promise);
                } else {
                    try {
                        // 通过 eventLoop.execute 方法,
                        // 保证 register0 方法在通道事件轮询器线程中调用
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        // 发生异常,要关闭通道,并进行相关通知
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    
            private void register0(ChannelPromise promise) {
                try {
                    // 检查通道是否仍然打开
                    // 当注册操作在 eventLoop 线程之外调用的话,
                    // 有可能这时通道被别的线程关闭了
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    // 第一次注册
                    boolean firstRegistration = neverRegistered;
                    // 调用 AbstractChannel 的 doRegister 方法,进行注册操作
                    doRegister();
                    neverRegistered = false;
                    // 设置 AbstractChannel 的 registered 成员属性,表示已经注册
                    registered = true;
    
                    // 确保在通道未注册前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也会被调用
                    // 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    // 注册成功的通知
                    safeSetSuccess(promise);
                    // 发送注册 入站IO事件
                    pipeline.fireChannelRegistered();
                    // 只有在通道从未注册的情况下才触发 channelActive 事件。
                    // 这可以防止在通道被取消注册和重新注册时触发多个通道 channelActive 事件。
                    if (isActive()) {
                        if (firstRegistration) {
                            // 第一次注册时,才会发送 channelActive 事件
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // 此通道在注册之前,设置了 autoRead()。
                            // 这意味着我们需要重新设置开始读取操作,以便介绍入站数据。
    
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // 发生异常,要关闭通道,并进行相关通知
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    

    将通道注册到事件轮询器EventLoop 上:

    • 如果当前通道已经注册,或者当前通道和事件轮询器不兼容,那么注册失败,调用 promisesetFailure方法进行通知。
    • 保证在事件轮询器线程调用实际注册register0方法。
    • 调用 AbstractChanneldoRegister 方法,进行注册操作,发送注册事件。
    • 如果通道已活跃,第一次注册的时候,就会发送 channelActive 事件;
    • 如果不是,那么就可能设置开始读的操作。
    • 如果这期间发生异常,就关闭通道,并进行相关通知。

    2.3 取消注册 deregister

            @Override
            public final void deregister(final ChannelPromise promise) {
                assertEventLoop();
    
                deregister(promise, false);
            }
    
            private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
                // 如果 promise不是 不可取消的,那么直接返回
                if (!promise.setUncancellable()) {
                    return;
                }
    
                if (!registered) {
                    // 当前通道没有注册,那么也表示取消注册成功,进行成功通知
                    safeSetSuccess(promise);
                    return;
                }
    
                // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
                // we need to ensure we do the actual deregister operation later. This is needed as for example,
                // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
                // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
                // the deregister operation this could lead to have a handler invoked by different EventLoop and so
                // threads.
    
                // See:
                // https://github.com/netty/netty/issues/4435
    
                // 通过 invokeLater 方法,将 doDeregister() 方法放在下一个事件轮询周期进行
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 调用 AbstractChannel 的 doDeregister 方法,进行取消注册操作
                            doDeregister();
                        } catch (Throwable t) {
                            logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                        } finally {
                            if (fireChannelInactive) {
                                pipeline.fireChannelInactive();
                            }
                            // Some transports like local and AIO does not allow the deregistration of
                            // an open channel.  Their doDeregister() calls close(). Consequently,
                            // close() calls deregister() again - no need to fire channelUnregistered, so check
                            // if it was registered.
                            if (registered) {
                                // 如果通道之前是注册成功了,
                                // 这里才发送取消注册的 IO 事件
                                registered = false;
                                pipeline.fireChannelUnregistered();
                            }
                            // 取消绑定成功通知
                            safeSetSuccess(promise);
                        }
                    }
                });
            }
    

    重点就是调用AbstractChanneldoDeregister 方法,进行取消注册操作。
    如果 fireChannelInactive == true,将发送 ChannelInactive 事件。

    2.4 绑定 bind

           @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                assertEventLoop();
                // 检查通道是否仍然打开
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                // See: https://github.com/netty/netty/issues/576
                if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                    localAddress instanceof InetSocketAddress &&
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                    !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                    // Warn a user about the fact that a non-root user can't receive a
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.
                    logger.warn(
                            "A non-root user can't receive a broadcast packet if the socket " +
                            "is not bound to a wildcard address; binding to a non-wildcard " +
                            "address (" + localAddress + ") anyway as requested.");
                }
    
                boolean wasActive = isActive();
                try {
                    // 调用 AbstractChannel 的 doBind 方法,进行绑定操作
                    doBind(localAddress);
                } catch (Throwable t) {
                    // 绑定失败的通知
                    safeSetFailure(promise, t);
    
                    // doBind(localAddress) 方法有可能关闭这个通道,
                    // 就可能需要进行关闭通道的通知
                    closeIfClosed();
                    return;
                }
    
               // 如果绑定操作后,通道从不活跃变成活跃,就要发送 ChannelActive 事件
                if (!wasActive && isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
                }
                // 绑定成功的通知
                safeSetSuccess(promise);
            }
    
    • 这个方法逻辑比较简单,重点就是调用AbstractChanneldoBind方法,进行绑定操作。
    • 如果绑定操作后,通道从不活跃变成活跃,就要发送 ChannelActive 事件。

    2.5 取消连接 disconnect

            @Override
            public final void disconnect(final ChannelPromise promise) {
                assertEventLoop();
                // 如果 promise不是 不可取消的,那么直接返回
                if (!promise.setUncancellable()) {
                    return;
                }
    
                boolean wasActive = isActive();
                try {
                    doDisconnect();
                    // 重置 remoteAddress and localAddress
                    remoteAddress = null;
                    localAddress = null;
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    // doDisconnect() 方法有可能关闭这个通道,
                    // 就可能需要进行关闭通道的通知
                    closeIfClosed();
                    return;
                }
    
                // 如果取消连接后,通道从活跃变成不活跃,就要发送 ChannelInactive 事件
                if (wasActive && !isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelInactive();
                        }
                    });
                }
    
                safeSetSuccess(promise);
                // doDisconnect() 方法有可能关闭这个通道,
                // 就可能需要进行关闭通道的通知
                closeIfClosed();
            }
    
    • 这个方法逻辑比较简单,重点就是调用AbstractChanneldoDisconnect()方法,进行取消连接操作。
    • 如果取消连接操作成功后,通道从活跃变成不活跃,就要发送 ChannelInactive 事件。

    2.6关闭 close

            public void close(final ChannelPromise promise) {
                assertEventLoop();
    
                ClosedChannelException closedChannelException =
                        StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
                close(promise, closedChannelException, closedChannelException, false);
            }
    
         private void close(final ChannelPromise promise, final Throwable cause,
                               final ClosedChannelException closeCause, final boolean notify) {
                // 如果 promise不是 不可取消的,那么直接返回
                if (!promise.setUncancellable()) {
                    return;
                }
    
                if (closeInitiated) {
                    // closeInitiated == true,已经调用过关闭操作了,就要return 返回了。
                    if (closeFuture.isDone()) {
                        // 已经通道已经关闭了,通知 promise
                        safeSetSuccess(promise);
                    } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
                        // 当前通道正在关闭,那么就添加一个监听器,当关闭成功后,再通知 promise
                        closeFuture.addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                promise.setSuccess();
                            }
                        });
                    }
                    return;
                }
    
                // 保证关闭方法只调用一次,不能重复调用
                closeInitiated = true;
    
                final boolean wasActive = isActive();
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                // 禁止再向写缓冲区 outboundBuffer 添加任何消息和刷新操作。
                this.outboundBuffer = null;
                Executor closeExecutor = prepareToClose();
                if (closeExecutor != null) {
                    closeExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // Execute the close.
                                doClose0(promise);
                            } finally {
                                // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                                invokeLater(new Runnable() {
                                    @Override
                                    public void run() {
                                        if (outboundBuffer != null) {
                                            // 使写缓冲区中所有排队的消息失败
                                            outboundBuffer.failFlushed(cause, notify);
                                             // 关闭写缓冲区
                                            outboundBuffer.close(closeCause);
                                        }
                                        fireChannelInactiveAndDeregister(wasActive);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    try {
                        // Close the channel and fail the queued messages in all cases.
                        doClose0(promise);
                    } finally {
                        if (outboundBuffer != null) {
                             // 使写缓冲区中所有排队的消息失败
                            outboundBuffer.failFlushed(cause, notify);
                             // 关闭写缓冲区
                            outboundBuffer.close(closeCause);
                        }
                    }
    
                    if (inFlush0) {
                        // 如果正在刷新操作,那么就让 fireChannelInactiveAndDeregister 操作,
                        // 放到下一个事件轮询周期中处理
                        invokeLater(new Runnable() {
                            @Override
                            public void run() {
                                fireChannelInactiveAndDeregister(wasActive);
                            }
                        });
                    } else {
                        // 取消注册和可能发送 ChannelInactive 事件
                        fireChannelInactiveAndDeregister(wasActive);
                    }
                }
            }
    
            private void doClose0(ChannelPromise promise) {
                try {
                    doClose();
                    closeFuture.setClosed();
                    safeSetSuccess(promise);
                } catch (Throwable t) {
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
            private void fireChannelInactiveAndDeregister(final boolean wasActive) {
                deregister(voidPromise(), wasActive && !isActive());
            }
    

    方法流程:

    1. 通过 closeInitiated 成员属性保证关闭方法只调用一次,不能重复调用。
    2. 因为关闭连接,需要考虑写缓冲区 ChannelOutboundBuffer 中的待写入数据的问题。
    3. 通过 prepareToClose() 方法,返回一个关闭通道的事件执行器。
      • 如果不为空,那么就在这个事件执行器中进行接下来的关闭操作。
      • 如果为空,那么就在当前线程进行接下来的关闭操作。
    4. 调用 doClose0(promise) 方法,进行关闭以及操作成功或失败的相关通知。
    5. 处理写缓冲区 outboundBuffer 中的数据,并关闭写缓冲区。
    6. 最后调用 fireChannelInactiveAndDeregister 方法,取消管道注册,以及可能会发送 ChannelInactive 事件。

      如果在 doClose() 方法之后,通道从活跃变成不活跃的情况下,才会发送 ChannelInactive 事件。

    2.7 shutdownOutput

           @UnstableApi
            public final void shutdownOutput(final ChannelPromise promise) {
                assertEventLoop();
                shutdownOutput(promise, null);
            }
    
            /**
             * 关闭相应通道的输出部分。
             * 例如,这将清理ChannelOutboundBuffer并不再允许任何写操作。
             */
            private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
                if (!promise.setUncancellable()) {
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    promise.setFailure(new ClosedChannelException());
                    return;
                }
                // 禁止再向写缓冲区 outboundBuffer 添加任何消息和刷新操作。
                this.outboundBuffer = null;
    
                final Throwable shutdownCause = cause == null ?
                        new ChannelOutputShutdownException("Channel output shutdown") :
                        new ChannelOutputShutdownException("Channel output shutdown", cause);
                Executor closeExecutor = prepareToClose();
                if (closeExecutor != null) {
                    closeExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 调用 AbstractChannel 的 doShutdownOutput 方法,
                                // 进行 shutdown 操作
                                doShutdownOutput();
                                // 操作成功通知
                                promise.setSuccess();
                                // 操作失败通知
                            } catch (Throwable err) {
                                promise.setFailure(err);
                            } finally {
                                // Dispatch to the EventLoop
                                eventLoop().execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        // 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
                                        // 并发送用户通知事件
                                        closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    try {
                        // 调用 AbstractChannel 的 doShutdownOutput 方法,
                        // 进行 shutdown 操作
                        doShutdownOutput();
                        // 操作成功通知
                        promise.setSuccess();
                    } catch (Throwable err) {
                        // 操作失败通知
                        promise.setFailure(err);
                    } finally {
                        // 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
                        // 并发送用户通知事件
                        closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                    }
                }
            }
    
            /**
             * 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
             * 并发送用户通知事件
             */
            private void closeOutboundBufferForShutdown(
                    ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
    
                // 使写缓冲区中所有排队的消息失败
                buffer.failFlushed(cause, false);
                // 关闭写缓冲区
                buffer.close(cause, true);
                // 发送一个通道 Shutdown 的用户通知事件
                pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
            }
    

    shutdown 的方法流程和 close 很像,区别点:

    • shutdown 是调用AbstractChanneldoShutdownOutput() 方法进行相关操作,而 close 是调用AbstractChanneldoClose() 方法。
    • close 最后会取消注册,以及可能会发送ChannelInactive 事件。
    • shutdown 会发送一个 ChannelOutputShutdownEvent.INSTANCE 用户自定义的通知事件。

    2.8 强制关闭 closeForcibly

            @Override
            public final void closeForcibly() {
                assertEventLoop();
    
                try {
                    doClose();
                } catch (Exception e) {
                    logger.warn("Failed to close a channel.", e);
                }
            }
    

    你会发现只调用了AbstractChanneldoClose() 方法进行关闭操作,不触发任何事件,也不处理写缓冲区。只可能在某些特殊情况下调用,例如尝试注册失败的时候。

    2.9 开始读 beginRead

            @Override
            public final void beginRead() {
                assertEventLoop();
    
                try {
                    doBeginRead();
                } catch (final Exception e) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireExceptionCaught(e);
                        }
                    });
                    close(voidPromise());
                }
            }
    

    调用AbstractChanneldoBeginRead() 方法设置通道开始读取数据。

    2.10 写操作 write

            @Override
            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    // 写缓冲区为 null,
                    try {
                        // 现在释放资源,以防止资源泄漏
                        ReferenceCountUtil.release(msg);
                    } finally {
                        //  如果outboundBuffer为空,我们就知道通道被关闭了,所以立即进行失败通知。
                        // See https://github.com/netty/netty/issues/2362
                        safeSetFailure(promise,
                                newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                    }
                    return;
                }
    
                int size;
                try {
                    // 进行消息的转换,例如将堆缓冲区变成直接缓冲区
                    msg = filterOutboundMessage(msg);
                    // 估算数据的大小
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    try {
                        // 失败时需要释放资源,以防止资源泄漏
                        ReferenceCountUtil.release(msg);
                    } finally {
                        // 进行操作失败的通知
                        safeSetFailure(promise, t);
                    }
                    return;
                }
    
                // 将数据添加到写缓冲区 outboundBuffer 中
                outboundBuffer.addMessage(msg, size, promise);
            }
    

    方法流程

    1. 先判断写缓冲区 outboundBuffer 是不是为 null,为空说明通道已关闭,进行失败通知。
    2. 通过 filterOutboundMessage(msg) 方法进行数据转换,例如将堆缓冲区变成直接缓冲区。
    3. 估算数据大小。
    4. 通过 outboundBuffer.addMessage(...) 方法,将数据添加到写缓冲区 outboundBuffer 中。
    5. 如果发送异常,记得释放数据 msg 的引用,防止内存泄露,并进行操作失败通知。

    2.11 刷新 flush

           @Override
            public final void flush() {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                // 写缓冲区为空,直接返回
                if (outboundBuffer == null) {
                    return;
                }
    
                // 将写缓冲区中的消息都标记成待刷新
                outboundBuffer.addFlush();
                // 进行刷新操作
                flush0();
            }
    
            @SuppressWarnings("deprecation")
            protected void flush0() {
                if (inFlush0) {
                    // 避免重复刷新
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                // 当前写缓冲区没有数据,那么直接返回
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                // 避免重复刷新
                inFlush0 = true;
    
                // 如果通道处于非活动状态,则将所有挂起的写请求标记为失败。
                if (!isActive()) {
                    try {
                        // Check if we need to generate the exception at all.
                        if (!outboundBuffer.isEmpty()) {
                            if (isOpen()) {
                                outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                            } else {
                                // Do not trigger channelWritabilityChanged because the channel is closed already.
                                outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                            }
                        }
                    } finally {
                        // 刷新操作完成,将 inFlush0重新设置为 false,以便下次刷新。
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
                    // 将给定缓冲区的内容刷新到远端
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    handleWriteError(t);
                } finally {
                    // 刷新操作完成,将 inFlush0重新设置为 false,以便下次刷新。
                    inFlush0 = false;
                }
            }
    
    • 通过 inFlush0 成员属性,来避免重复刷新。
    • 如果通道处于非活动状态,则将所有挂起的写请求标记为失败。
    • 通过 AbstractChanneldoWrite(outboundBuffer) 方法,将缓冲区的内容刷新到远端。

    2.12 小结

    对比 Unsafe 的方法,你会发现 AbstractUnsafe 中没有实现 connect(...) 连接方法。

    对比发送入站IO事件:

    1. ChannelRegisteredChannelUnregistered

      • register 方法会发送 ChannelRegistered 事件。
      • deregister 方法只有在通道之前已经注册之后,才会发送 ChannelUnregistered 事件。
    2. ChannelActiveChannelInactive

      • 一般都是通道Channel从不活跃变成活跃,要发送 ChannelActive 事件;可能引起这个变化的操作有 bindconnect 操作。
      • 通道Channel从活跃变成不活跃,就要发送 ChannelInactive 事件;可能引起这个变化的操作有 disconnect,closeshutdown
      • 最后如果第一次注册时,且当前通道是活跃状态,也会发送 ChannelActive 事件。

    三. ChannelOutboundBuffer

    AbstractChannel.Unsafe 中看到用户调用write(...) 方法写的数据,会先添加到写缓冲区 ChannelOutboundBuffer 中,然后调用 flush() 方法,才将写缓冲区中的数据发送到远端。

    3.1 重要成员属性

        // 在链表结构中第一个被刷新的节点
        private Entry flushedEntry;
    
        // 在链表结构中第一个未刷新的节点
        private Entry unflushedEntry;
    
        // 表示链表中最后一个节点
        private Entry tailEntry;
        // 等待刷新节点的数量
        private int flushed;
    

    写缓冲区通过链表来储存数据(依靠 Entry.next 来实现链表),链表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

    • flushedEntry 表示第一个被刷新的节点,在链表头,当然也是通过 addFlush() 方法设置的。
    • unflushedEntry 表示第一个未刷新的节点,表示还没有被标记刷新的第一个节点。
    • tailEntry 最后一个节点。
    • flushed 刷新节点的数量,这个属性很重要,靠它来标记刷新节点,也就是说从 flushedEntry 开始, flushed 数量的节点都被标记为刷新节点了。

    3.2 重要方法

    3.2.1 添加数据

    这个方法一般在 AbstractChannel.AbstractUnsafewrite(...) 方法中调用。

     /**
         * 将给定的消息 msg 添加到ChannelOutboundBuffer中。
         * 一旦消息写入,给定的ChannelPromise将被通知。
         */
        public void addMessage(Object msg, int size, ChannelPromise promise) {
            // 将给定消息封装成一个节点
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);
            if (tailEntry == null) {
                flushedEntry = null;
            } else {
                // 将新消息节点添加到队列尾
                Entry tail = tailEntry;
                tail.next = entry;
            }
            tailEntry = entry;
            if (unflushedEntry == null) {
                // 如果未刷新节点为空,说明队列节点都变成刷新节点了,
                // 那么这个新添加的节点,就是未刷新节点的头了。
                unflushedEntry = entry;
            }
    
            // See https://github.com/netty/netty/issues/1619
            // 向未刷新的数组添加消息后,增加挂起的字节数。
            incrementPendingOutboundBytes(entry.pendingSize, false);
        }
    
    • 先将数据 msg 封装成一个节点 entry,并将节点添加到链表尾。
    • 如果 unflushedEntrynull,那么这个节点就是第一个未刷新节点。
    • incrementPendingOutboundBytes(...) 方法,增加挂起的字节数,看是否需要改变通道的 可写属性。

    3.2.2 标记刷新

    这个方法一般在 AbstractChannel.AbstractUnsafeflush() 方法中调用。

      /**
         * 向此ChannelOutboundBuffer添加刷新。
         * 这意味着所有以前添加的消息都被标记为刷新,因此您将能够处理它们。
         */
        public void addFlush() {
            // There is no need to process all entries if there was already a flush before and no new messages
            // where added in the meantime.
            //
            // See https://github.com/netty/netty/issues/2577
            // 未刷新节点后面的链表示新添加的节点列表,都是要加入到刷新中
            Entry entry = unflushedEntry;
            if (entry != null) {
                if (flushedEntry == null) {
                    // there is no flushedEntry yet, so start with the entry
                    flushedEntry = entry;
                }
                do {
                    flushed ++;
                    // 将所有要刷新的节点变成不可取消的
                    if (!entry.promise.setUncancellable()) {
                        // Was cancelled so make sure we free up memory and notify about the freed bytes
                        // 挂起消息被取消,所以确保我们释放内存并通知释放的字节
                        int pending = entry.cancel();
                        decrementPendingOutboundBytes(pending, false, true);
                    }
                    entry = entry.next;
                } while (entry != null);
    
                // 节点都变成已刷新的了,未刷新节点就设置为 null
                unflushedEntry = null;
            }
        }
    
    • 将从 unflushedEntry 未刷新节点开始到链表尾的所有节点都标记为刷新。通过 flushed++ 来增加刷新节点数量。
    • 调用 setUncancellable(...) 要写入的节点是不可取消的,如果设置失败,就要取消挂起数据,并调用 decrementPendingOutboundBytes(...) 减少挂起字节数,看是否需要改变通道的 可写属性。

    3.2.3 删除节点

    
        /**
         * 将删除当前消息,将其ChannelPromise标记为success并返回true。
         * 如果在调用此方法时不存在刷新的消息,则返回false,表示没有准备好处理的消息。
         */
        public boolean remove() {
            Entry e = flushedEntry;
            if (e == null) {
                clearNioBuffers();
                return false;
            }
            Object msg = e.msg;
    
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
            removeEntry(e);
    
            if (!e.cancelled) {
                // only release message, notify and decrement if it was not canceled before.
                ReferenceCountUtil.safeRelease(msg);
                safeSuccess(promise);
                decrementPendingOutboundBytes(size, false, true);
            }
    
            // recycle the entry
            e.recycle();
    
            return true;
        }
    
        private void removeEntry(Entry e) {
            if (-- flushed == 0) {
                // flushed == 0, 表示所有刷新节点都被处理了
                flushedEntry = null;
                if (e == tailEntry) {
                    tailEntry = null;
                    unflushedEntry = null;
                }
            } else {
                // 将下一个节点变成刷新节点
                flushedEntry = e.next;
            }
        }
    

    当缓存区当前刷新节点数据被写入到远端了,那么调用这个 remove() 方法,移除当前节点,得到下一个刷新节点。

    相关文章

      网友评论

          本文标题:Netty源码_AbstractChannel和ChannelO

          本文链接:https://www.haomeiwen.com/subject/tbfzaltx.html