美文网首页
我为 Netty 贡献源码 | 且看 Netty 如何应对 TC

我为 Netty 贡献源码 | 且看 Netty 如何应对 TC

作者: bin的技术小屋 | 来源:发表于2022-06-15 16:00 被阅读0次

    本系列Netty源码解析文章基于 4.1.56.Final版本

    我们接着上篇文章 我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景(上) 继续讲解关于 TCP 连接半关闭的内容~~~

    5. TCP 连接半关闭 HalfClosure

    TCP 是一个全双工的传输层通信协议,那么我们在关闭 TCP 连接的时候就需要考虑读写这两个通道的关闭。

    image.png

    之前介绍的关闭流程是主动关闭方调用 close 方法也就是 JDK NIO 中 SocketChannel#Close 方法来发送 FIN 关闭连接。但是 close 方法是同时将读写两个通道全部关闭,也就是说主动关闭方在调用 close 方法以后既不能接收对端的数据也不能向对端发送数据了。

    close关闭但FIN_WAIT2接收数据 RST.png

    比如:主动关闭方调用 close 方法发出 FIN 开始关闭流程之后,如果在 FIN_WAIT2 状态下收到对端发送过来的数据,那么就会直接丢弃,并发送 RST 给对端强制关闭连接。

    那么有没有一种更优雅的关闭方式就是只关闭读写通道其中一个,关闭了写通道就不能发送数据给对端,但是还可以接受对端发送过来的数据。

    关闭了读通道,就不能读取对端发送过来的数据,但是还可以向对端写数据。当连接上遗留的数据全部处理完毕后,主动关闭方和被动关闭方在先后调用 close 方法关闭连接释放资源。

    这种更加优雅的关闭方式就是本小节我们要讨论的 TCP 连接的半关闭 HalfClosure 。

    操作系统内核为我们提供了 shutdown 这样一个系统调用来实现 TCP 连接的半关闭,shutdown 函数可以控制只关闭连接的某一个方向,或者全部关闭。

    int shutdown(int sockfd, int how)
    

    参数 sockfd 为将要关闭 Socket 的文件描述符,参数 how 表示关闭连接的哪个方向 ( 关闭读 or 关闭写 or 全部关闭 )。

    • SHUT_RD:表示关闭读通道,如果此时 Socket 接收缓冲区有已接收的数据,则会将这些数据统统丢弃。如果后续再收到新的数据,虽然也会对这些数据进行 ACK 确认,但是会悄悄丢弃掉。所以在这种情况下,对端虽然收到了 ACK 确认,但是这些以发送的数据可能已经被悄悄丢弃了。

    关闭读通道的方法在 JDK NIO 中对应于 SocketChannel#shutdownInput() ,这里需要注意的是此方法并不会发送 FIN。

    • SHUT_WR:关闭写通道,这就是本小节的重点,调用该方法发起 TCP 连接的半关闭流程。此时如果 Socket 发送缓冲区还有未发送的数据,则会立即发送出去,并发送一个 FIN 给对端。关闭写通道的方法在 JDK NIO 中对应于 SocketChannel#shutdownOutput()。

    • SHUTRDWR : 同时关闭连接读写两个通道。

    在介绍完了 TCP 连接半关闭的系统调用之后,我们接下来看下 TCP 连接半关闭的流程:

    TCP连接半关闭.png
    • 首先客户端会调用 shutdownOutput 方法发起半关闭流程,关闭客户端连接的写通道,然后发送 FIN 给服务端。

    • 和我们在《1. 正常 TCP 连接关闭》小节里介绍的流程一样,服务端的内核协议栈在接收到客户端发来的 FIN 后,会自动向客户端回复 ACK 确认,随后内核会将文件结束符 EOF 插入到 Socket 的接收缓冲区中,此时 OP_READ 事件活跃,再一次进入到 AbstractNioByteChannel.NioByteUnsafe#read 方法处理 OP_READ 事件,此时客户端的连接状态为 FIN_WAIT2 ,服务端的连接状态为 CLOSE_WAIT 。

    • 服务端在收到连接半关闭请求后,会立马调用 shutdownInput 关闭自己的读通道。随后在 pipeline 中触发 ChannelInputShutdownEvent 事件,用户可以在该事件中处理遗留的数据,处于 CLOSE_WAIT 状态的服务端可以继续向处于 FIN_WAIT2 状态的客户端继续发送数据。

    • 当 TCP 连接处于半关闭状态的时候,JDK NIO Selector 会不断的通知 OP_READ 事件活跃直到 TCP 连接真正的关闭,所以用户在处理完 ChannelInputShutdownEvent 事件之后,又会立马收到处理 OP_READ 事件的通知,在这次通知中触发 ChannelInputShutdownReadComplete 事件,表示遗留数据已经处理完毕,用户可以在这个事件响应中调用 close 来彻底关闭连接。 此后服务端结束 CLOSE_WAIT 状态进入 LAST_ACK 状态。

    • 客户端收到服务端发送过来的 FIN 后,调用 close 方法注销 Channel ,关闭连接。结束 FIN_WAIT2 状态进入 TIME_WAIT 状态。

    6. 主动关闭方发起 TCP 半关闭流程

    在 TCP 半关闭的场景下,主动关闭方需要调用 shutdownOutput 方法向被动关闭方发送 FIN 开始 TCP 半关闭流程。

    在本小节的示例中,客户端可以在自己的 ChannelHandler 中调用 Channel 的 shutdownOutput 方法来发起 TCP 半关闭流程。

            SocketChannel sc = (SocketChannel) ctx.channel();     
            sc.shutdownOutput();
    

    下面我们就来分析下在 netty 中对于 shutdownOutput 的实现。

    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    
        @Override
        public ChannelFuture shutdownOutput() {
            return shutdownOutput(newPromise());
        }
    
        @Override
        public ChannelFuture shutdownOutput(final ChannelPromise promise) {
            final EventLoop loop = eventLoop();
            if (loop.inEventLoop()) {
                ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
                    }
                });
            }
            return promise;
        }
    
    }
    

    从如上代码中,我们可以看出对于 shutdownOutput 的操作也是必须在 Reactor 线程中完成。

    这里大家可以发现 shutdownOutput 半关闭的流程其实和 close 的流程非常的相似。

          private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
                if (!promise.setUncancellable()) {
                    return;
                }
    
                //如果Channel已经close了,直接返回
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    promise.setFailure(new ClosedChannelException());
                    return;
                }
                
                //半关闭状态下,不允许继续写入数据到Socket
                this.outboundBuffer = null; 
    
                final Throwable shutdownCause = cause == null ?
                        new ChannelOutputShutdownException("Channel output shutdown") :
                        new ChannelOutputShutdownException("Channel output shutdown", cause);
    
                Executor closeExecutor = prepareToClose();
                if (closeExecutor != null) {
                    closeExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {           
                                // 将jdk nio 底层的Socket shutdown
                                doShutdownOutput();
                                promise.setSuccess();
                            } catch (Throwable err) {
                                promise.setFailure(err);
                            } finally {
                                // Dispatch to the EventLoop
                                eventLoop().execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        //清理ChannelOutboundBuffer,并触发ChannelOutputShutdownEvent事件
                                        closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    try {
                        // 在 Reactor 线程中执行
                        doShutdownOutput();
                        promise.setSuccess();
                    } catch (Throwable err) {
                        promise.setFailure(err);
                    } finally {
                        closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                    }
                }
            }
    

    一开始都需要通过 ChannelOutboundBuffer 是否为 null 来判断当前 Channel 是否已经关闭了,如果已经关闭,则停止执行后续半关闭流程。

    当 shutdownOutput 方法调用之后,主动关闭方连接的写通道就被关闭了,所以在这个状态下是不允许用户继续向 Channel 写入数据的, 所以这里会将 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 设置为 null 。

    和前边我们介绍调用 close 方法发起 TCP 连接的正常关闭流程一样,这里也会调用 prepareToClose() 方法来处理设置 SO_LINGER 选项的情况。

         @Override
            protected Executor prepareToClose() {
                try {
                    if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                        doDeregister();
                        return GlobalEventExecutor.INSTANCE;
                    }
                } catch (Throwable ignore) {
    
                }
                return null;
            }
    

    如果 Socket 设置了 SO_LINGER 选项则需要首先将 Channel 注销,后续的半关闭流程需要在 GlobalEventExecutor 线程中执行。否则继续在 Reactor 线程中执行。

    关于 prepareToClose() 方法的详细介绍,大家可以回看本文中的《 2.1.3 针对 SO_LINGER 选项的处理》小节

    接下来就会调用 doShutdownOutput() 方法关闭底层 JDK NIO SocketChannel 的写通道。此时内核协议栈会向对端发送 FIN 发起 TCP 半关闭流程。

    public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    
        protected final void doShutdownOutput() throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().shutdownOutput();
            } else {
                javaChannel().socket().shutdownOutput();
            }
        }
    
    }
    
    TCP连接半关闭.png

    当半关闭流程发起之后,ShutdownOutput 的核心任务就算结束了,此时就需要设置用户持有的 shutdownOutputPromise 成功,随后用户就会得到通知。

    最后在 Reactor 线程中清理 ChannelOutboundBuffer 中的待发送数据,并在 pipeline 中传播 ChannelOutputShutdownEvent 事件。相关的清理细节笔者已经在本文前边相关的章节中详细介绍过了,这里不在重复。

        private void closeOutboundBufferForShutdown(
                    ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
                //shutdownOutput半关闭后需要清理channelOutboundBuffer中的待发送数据flushedEntry
                buffer.failFlushed(cause, false);
                //循环清理channelOutboundBuffer中的unflushedEntry
                buffer.close(cause, true);
                pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
            }
    

    ChannelOutputShutdownEvent 是一种 UserEventTriggered 事件,它是 netty 提供的一种事件扩展机制可以允许用户自定义异步事件,这样可以使得用户能够灵活的定义各种复杂场景的处理机制。

    UserEventTriggered 也是一种 Inbound 类事件,在 pipeline 中的传播反向也是从前向后传播。

    image.png

    我们可以在 ChannelHandler 中这样捕获 ChannelOutputShutdownEvent 写通道关闭事件:

    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (ChannelOutputShutdownEvent.INSTANCE == evt) {
                  .......处理写通道关闭事件.........
            }
        }
    
    }
    

    此时主动关闭方已经关闭了写通道,进入 FIN_WAIT2 状态。因为现在读通道还没有关闭,所以在 FIN_WAIT2 状态下还是可以继续接受并处理对端发来的数据的。

    理想很美好,现实却很骨感,在本小节中主动关闭方在 FIN_WAIT2 状态下真的可以接收来自对端的数据吗??

    大家先可以结合笔者在 《 2.1.3 针对 SO_LINGER 选项的处理》小节中介绍的内容以及本小节介绍的 TCP 写通道关闭流程,对照下面这副图认真思考下这个问题。

    TCP连接半关闭.png

    7. 啊哈!!Bug !!

    在为大家解释这个 Bug 之前,笔者先再次带大家回顾下本文《 2.1.3 针对 SO_LINGER 选项的处理》小节中 prepareToClose 方法的逻辑,它有两个关键点:

    • 当使用了 SO_LINGER 选项后,调用 Socket 的 close 方法会阻塞关闭流程,所以需要将 Socket 的关闭动作放在 GlobalEventExecutor 中执行。

    • 当使用了 SO_LINGER 选项后,为了防止在延迟关闭期间继续处理读写事件,产生不必要的 CPU 资源浪费,所以需要调用 doDeregister() 方法将 Channel 从 Reactor 中注销掉

         @Override
            protected Executor prepareToClose() {
                try {
                    if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                        doDeregister();
                        return GlobalEventExecutor.INSTANCE;
                    }
                } catch (Throwable ignore) {
    
                }
                return null;
            }
    

    这些逻辑在 close 的关闭场景中是合理的,但是在 shutdownOutput 半关闭场景就出问题了。

    假设用户在开启了 SO_LINGER 选项的情况下,调用 shutdownOutput 半关闭 TCP 连接,那么用户的本意是只关闭写通道,但是仍然希望在 FIN_WAIT2 状态下接收来自服务端发送过来的数据,实现优雅关闭。

    但实际上 netty 在 shutdownOutput 方法中调用了 prepareToClose() 方法从而间接导致了 doDeregister() 方法的调用,Channel 从 Reactor 中注销掉,也就是说从此以后无法在产生 OP_READ 活跃事件无法接收并且处理服务端发送过来的数据。

    由于以上原因,如下如图所示,主动关闭方在 FIN_WAIT2 状态下是无法接收到数据的,因为此时 Channel 已经从 Reactor 上注销了。

    SO_LINGERHalfClosureBug.png

    另外还有一点就是,无论 SO_LINGER 选项是否设置,shutdown 系统调用函数均不会阻塞,这里和 close 系统调用不同。所以这里也并不需要用一个 GlobalEventExecutor 去执行 shutdownOutput 任务,直接在 Reactor 线程中执行即可。

    所以综合以上两点原因,在 shutdownOutput 中是不需要调用 prepareToClose() 方法的。

    现在我们知道了 Bug 产生的原因,那么修复过程就变的非常简单了~~~

    8. 提交 PR ,修复 Bug

    笔者首先向 Netty 社区提交了一个 Issue,在 Issue 中详细为社区人员描述了这个 Bug 产生的原因。也就是上一小节中的内容。

    Issue : https://github.com/netty/netty/issues/11981

    image.png SO_LINGERHalfClosureBug.png

    随后笔者按照《7. 啊哈!!Bug !!》小节中介绍的修复思路为这个 Issue 提交了修复 PR ,

    PR :https://github.com/netty/netty/pull/11982

    PR.png

    笔者修复后的 ShutdownOutput 流程逻辑如下:

    image.png

    编写单元测试,然后信心满满地等待 PR 被 Merged。

    public class SocketHalfClosedTest extends AbstractSocketTest {
    
        @Test
        @Timeout(value = 5000, unit = MILLISECONDS)
        public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
            run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
                @Override
                public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
                    testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
                }
            });
        }
    
        private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
                throws Throwable {
            Channel serverChannel = null;
            Channel clientChannel = null;
    
            final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
            try {
                sb.childOption(ChannelOption.SO_LINGER, 1)
                  .childHandler(new ChannelInitializer<Channel>() {
    
                      @Override
                      protected void initChannel(Channel ch) throws Exception {
                          ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    
                                @Override
                                public void channelActive(final ChannelHandlerContext ctx) {
                                    SocketChannel channel = (SocketChannel) ctx.channel();
                                    channel.shutdownOutput();
                                }
    
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ReferenceCountUtil.release(msg);
                                    waitHalfClosureDone.countDown();
                                }
                            });
                      }
                  });
    
                cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
                  .handler(new ChannelInitializer<Channel>() {
                      @Override
                      protected void initChannel(Channel ch) throws Exception {
                          ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    
                                @Override
                                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                                    if (ChannelInputShutdownEvent.INSTANCE == evt) {
                                        ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
                                    }
    
                                    if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
                                        ctx.close();
                                    }
                                }
                            });
                      }
                  });
    
                serverChannel = sb.bind().sync().channel();
                clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
                waitHalfClosureDone.await();
            } finally {
                if (clientChannel != null) {
                    clientChannel.close().sync();
                }
    
                if (serverChannel != null) {
                    serverChannel.close().sync();
                }
            }
        }
    }
    

    还是那句话 “理想很丰满,现实很骨感”,Netty 作为一个世界知名的高性能开源框架,必定有着非常严格的代码规范。比如:

    • 代码书写规范:函数与函数之间的空行个数,单行代码的长度,函数命名的长度, .... 等。

    • 注释的规范:单行注释的长度,字符与字符之间的空格,...... 等。

    • 单元测试规范。

    PR 提交过去也是出现了很多规范类的 CheckStyle 错误,也是经过了多轮 Review 和多轮修改最终通过了 Netty 的 CI 流程被 Merged 进主干分支。并在 Netty 的 4.1.73.Final 中发布。

    image.png image.png

    在 4.1.73.Final 版本发布之后,笔者第一时间拉下来最新的代码,看到 Git 记录中出现了自己的名字,想象着自己的代码跑在了各大知名框架中,还是很有成就感的一件事。

    文章封面.png

    9. 被动关闭方处理TCP半关闭流程

    TCP连接半关闭.png

    当主动关闭方调用 shutdownOutput 后,内核会检查此时 Socket 发送缓冲区是否还有数据,如果有就将数据发送出去,并关闭 Socket 的写通道,随后发送 FIN 给对端。

    接下来的流程和《1. 正常 TCP 连接关闭》小节中的流程一样,服务端 OP_READ 事件活跃,Reactor 线程开始处理 OP_READ 事件。

    public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    
            @Override
            public final void read() {
                final ChannelConfig config = config();
    
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
    
                ..........省略获取allocHandle过程.......
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        //记录本次读取了多少字节数
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询
                        // -1 表示客户端主动关闭了连接close或者shutdownOutput 这里均会返回-1
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            //当客户端主动关闭连接时(客户端发送fin1),会触发read就绪事件,这里从channel读取的数据会是-1
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        .........省略.............
    
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        //此时客户端发送fin1(fi_wait_1状态)主动关闭连接,服务端接收到fin,并回复ack进入close_wait状态                    
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                     ............省略...............
                } finally {
                     ............省略...............
                }
            }
        }
    
    }
    

    这里通过 doReadBytes 方法从 Channel 中读取数据依然返回 -1 。随后又会进入 closeOnRead 方法处理半关闭逻辑。

    9.1 closeOnRead

            private void closeOnRead(ChannelPipeline pipeline) {    
                if (!isInputShutdown0()) {
                    if (isAllowHalfClosure(config())) {             
                        shutdownInput();
                        pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
                    } else {
                           .....省略正常关闭....
                    }
                } else {
                    .....省略....
                }
            }
    

    首先会调用 isInputShutdown0 方法判断服务端 Channel 的读通道是否已经关闭,现在客户端 Channel 的写通道已经关闭,但此时服务端才刚开始处理半关闭,所以现在服务端 Channel 读写通道都还没有关闭。

        @Override
        public boolean isInputShutdown() {
            return javaChannel().socket().isInputShutdown() || !isActive();
        }
    

    随后判断服务端是否支持半关闭 isAllowHalfClosure。

       private static boolean isAllowHalfClosure(ChannelConfig config) {
            return config instanceof SocketChannelConfig &&
                    ((SocketChannelConfig) config).isAllowHalfClosure();
        }
    

    可通过如下配置开启半关闭的支持:

        ServerBootstrap sb = new ServerBootstrap();
        sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)                        
    

    如果服务端开启了半关闭的支持 isAllowHalfClosure == true ,下面就正式进入了半关闭的处理流程:

    1. 调用 shutdownInput 方法关闭服务端 Channel 的读通道,如果此时 Socket 接收缓冲区还有数据,则会将这些数据统统丢弃。注意关闭读通道并不会向对端发送 FIN ,此时服务端连接依然处于 CLOSE_WAIT 状态。
        private void shutdownInput0() throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                //调用底层JDK socketChannel关闭接收方向的通道
                javaChannel().shutdownInput();
            } else {
                javaChannel().socket().shutdownInput();
            }
        }
    
    1. 在 pipeline 中触发 ChannelInputShutdownEvent 事件,我们可以在 ChannelInputShutdownEvent 事件的回调方法中,向客户端发送遗留的数据,做到真正的优雅关闭。这里就是图中处于 CLOSE_WAIT 状态下的服务端在半关闭场景下可以继续向处于 FIN_WAIT2 状态下的客户端发送数据的地方
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (ChannelInputShutdownEvent.INSTANCE == evt) {
                //在close_wait状态下,发送数据给对端
                ctx.writeAndFlush(message);
            }
        }
    
    }
    

    在连接半关闭的情况下,JDK NIO Selector 会不停的通知 OP_READ 事件活跃,所以 read loop 会一直不停的执行,当 Reactor 处理完 ChannelInputShutdownEvent 之后,由于 Selector 又会通知 OP_READ 事件活跃,所以半关闭流程再一次来到了 closeOnRead 方法。

            //表示Input已经shutdown了,再次对channel进行读取返回-1  设置该标志
            private boolean inputClosedSeenErrorOnRead;
    
            private void closeOnRead(ChannelPipeline pipeline) {    
                if (!isInputShutdown0()) {
                    if (isAllowHalfClosure(config())) {             
                           .....省略半关闭.....
                    } else {
                           .....省略正常关闭....
                    }
                } else {
                    inputClosedSeenErrorOnRead = true;
                    pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
                }
            }
    

    那么此时服务端的读通道已经关闭了 isInputShutdown0 == true 。所以流程来到 else 分支。

    • 设置 inputClosedSeenErrorOnRead = true 表示此时 Channel 的读通道已经关闭了,不能再继续响应 OP_READ 事件,因为半关闭状态下,Selector 会不停的通知 OP_READ 事件,如果不停无脑响应的话,会造成极大的 CPU 资源的浪费。

    不过 JDK 这样处理也是合理的,毕竟半关闭状态连接并没有完全关闭,只要连接没有完全关闭,就不停的通知你,直到关闭连接为止。

    • 在 pipeline 中触发 ChannelInputShutdownReadComplete 事件,此事件的触发标志着服务端在 CLOSE_WAIT 状态下已经将所有遗留的数据发送给了客户端,服务端可以在该事件的回调中关闭 Channel ,结束 CLOSE_WAIT 进入 LAST_ACK 状态。
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (ChannelInputShutdownReadComplete.INSTANCE == evt) {      
                ctx.close();
            }
        }
    

    因为半关闭的状态下,在没有调用 close 方法关闭 Channel 之前,JDK NIO Selector 会一直不停的通知 OP_READ 事件,所以流程马上又会回到 OP_READ 事件的处理方法中。

    public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    
            @Override
            public final void read() {
                final ChannelConfig config = config();
    
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
    
                ..........省略获取allocHandle过程.......
    
                try {
                    do {
                              .........省略.............
    
                    } while (allocHandle.continueReading());
    
                   .........省略.............
                } catch (Throwable t) {
                     ............省略...............
                } finally {
                     ............省略...............
                }
            }
        }
    
    }
    

    那么这次我们就不能在响应 OP_READ 事件了,需要调用 clearReadPending 方法将读事件从 Reactor 中取消掉,停止对 OP_READ 事件的监听。否则 Reactor 线程就会在半关闭期间内一直在这里空转,导致 CPU 100%。

    这里的 shouldBreakReadReady 方法就是用来判断在半关闭期间是否取消 OP_READ 事件的监听。这里的 inputClosedSeenErrorOnRead 已经设置为 true 了。

       final boolean shouldBreakReadReady(ChannelConfig config) {
            return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
        }
    

    到这里为止,netty 关于连接关闭所要面对的所有处理场景,笔者就为大家一一介绍完了。


    总结

    本文我们介绍了 netty 在面对 TCP 连接关闭时的三种处理场景时的处理逻辑和过程。

    这三种处理场景分别是:TCP 连接的正常关闭,TCP 连接的异常关闭,以及用于优雅关闭的 TCP 连接的半关闭。同时我们也发现了 netty 关于半关闭处理时的一个 BUG 。

    BUG :https://github.com/netty/netty/issues/11981

    这个 Bug 导致主动关闭方在 FIN_WAIT2 状态下无法接受到来自被动关闭方在 CLOSE_WAIT 状态下发送的数据。随后又详细分析了这个 Bug 的整个修复过程。

    其中我们还穿插介绍了 SO_LINGER 选项对于 TCP 连接关闭行为的影响,以及 netty 针对 SO_LINGER 选项的处理过程。

    同时笔者还为大家列举了关于导致 TCP 连接异常关闭的 7 种场景:

    1. 半连接队列 SYN-Queue 已满

    2. 全连接队列 ACCEPT-Queue 已满

    3. 连接未被监听的端口

    4. 服务端程序崩溃

    5. 开启 SO_LINGER 选项设置 l_linger = 0

    6. 主动关闭方在关闭时 Socket 接收缓冲区还有未处理数据

    7. 主动关闭方 close 关闭但在 FIN_WAIT2 状态接收数据

    以及 Netty 对 RST 包的处理流程。最后笔者还介绍了用于连接半关闭的系统调用 shutdown 的使用方法,以及 netty 对连接半关闭的流程处理逻辑。

    其中笔者还详细对比了 shutdown 系统调用和 close 系统调用的区别与联系。它们在调用之后都会向对端发送 FIN 包。但是在设置 SO_LINGER 选项的时候 close 系统调用会阻塞,shutdown 系统调用则不会阻塞。

    TCP连接半关闭.png

    最后笔者需要特别强调的是在我们使用 shutdown 进行 TCP 连接的半关闭时,作为连接的被动关闭方,在最后一定要记得调用 close 方法来彻底关闭连接,并释放连接相关资源。否则被动关闭方就会一直停留在 CLOSE_WAIT 状态。

    而作为主动关闭方在 FIN_WAIT2 状态下接收到来自被动关闭方在 CLOSE_WAIT 状态下发送的 FIN 之后,记得要释放客户端的资源。

    好了,本文的内容就到这里,感谢大家收看到这里,我们下篇文章见~~~

    相关文章

      网友评论

          本文标题:我为 Netty 贡献源码 | 且看 Netty 如何应对 TC

          本文链接:https://www.haomeiwen.com/subject/hatfvrtx.html