美文网首页
Netty Pipeline源码分析(2)

Netty Pipeline源码分析(2)

作者: wangwei_hz | 来源:发表于2018-11-01 17:47 被阅读13次

    原文:https://wangwei.one/posts/netty-pipeline-source-analyse-2.html

    前面 ,我们分析了Netty Pipeline的初始化及节点添加与删除逻辑。接下来,我们将来分析Pipeline的事件传播机制。

    Netty版本:4.1.30

    inBound事件传播

    示例

    我们通过下面这个例子来演示Netty Pipeline的事件传播机制。

    public class NettyPipelineInboundExample {
    
        public static void main(String[] args) {
            EventLoopGroup group = new NioEventLoopGroup(1);
            ServerBootstrap strap = new ServerBootstrap();
            strap.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(8888))
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new InboundHandlerA());
                            ch.pipeline().addLast(new InboundHandlerB());
                            ch.pipeline().addLast(new InboundHandlerC());
                        }
                    });
            try {
                ChannelFuture future = strap.bind().sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    class InboundHandlerA extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("InboundHandler A : " + msg);
            // 传播read事件到下一个channelhandler
            ctx.fireChannelRead(msg);
        }
    
    }
    
    class InboundHandlerB extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("InboundHandler B : " + msg);
            // 传播read事件到下一个channelhandler
            ctx.fireChannelRead(msg);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // channel激活,触发channelRead事件,从pipeline的heandContext节点开始往下传播
            ctx.channel().pipeline().fireChannelRead("Hello world");
        }
    }
    
    class InboundHandlerC extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("InboundHandler C : " + msg);
            // 传播read事件到下一个channelhandler
            ctx.fireChannelRead(msg);
        }
    }
    
    

    源码

    通过 telnet 来连接上面启动好的netty服务,触发channel active事件:

    $ telnet 127.0.0.1 8888
    

    按照InboundHandlerA、InboundHandlerB、InboundHandlerC的添加顺序,控制台输出如下信息:

    InboundHandler A : Hello world
    InboundHandler B : Hello world
    InboundHandler C : Hello world
    

    若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:

    ...
    
    ch.pipeline().addLast(new InboundHandlerB());
    ch.pipeline().addLast(new InboundHandlerA());
    ch.pipeline().addLast(new InboundHandlerC());
    
    ...
    

    输出如下信息:

    InboundHandler B : Hello world
    InboundHandler A : Hello world
    InboundHandler C : Hello world
    

    源码分析

    强烈建议 下面的流程,自己通过IDE的Debug模式来分析

    待netty启动成功,通过telnet连接到netty,然后通过telnet终端输入任意字符(这一步才开启Debug模式),进入Debug模式。

    触发channel read事件,从下面的入口开始调用

    public class DefaultChannelPipeline implements ChannelPipeline {
        
        ...
        
        // 出发channel read事件
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            // 从head节点开始往下传播read事件
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
        
        ...
    
    }
    

    调用 AbstractChannelHandlerContext 中的 invokeChannelRead(head, msg) 接口:

    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
            implements ChannelHandlerContext, ResourceLeakHint {
    
        ...
    
        // 调用channel read
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            // 获取消息
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            // 获取 EventExecutor
            EventExecutor executor = next.executor();
            // true
            if (executor.inEventLoop()) {
                // 调用下面的invokeChannelRead接口:invokeChannelRead(Object msg)
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
        
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    // handler():获取当前遍历到的channelHandler,第一个为HeandContext,最后为TailContext
                    // 调用channel handler的channelRead接口
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
        
        ...
        
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            // 调回到上面的 invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
        
        ...
        
        // 遍历出下一个ChannelHandler
        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                //获取下一个inbound类型的节点
                ctx = ctx.next;
                // 必须为inbound类型
            } while (!ctx.inbound);
            return ctx;
        }
        
        ...
    }    
    
    

    Pipeline中的第一个节点为HeadContext,它对于channelRead事件的处理,是直接往下传播,代码如下:

    final class HeadContext extends AbstractChannelHandlerContext
        
        ...
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // HeadContext往下传播channelRead事件,
            // 调用HeandlerContext中的接口:fireChannelRead(final Object msg)
            ctx.fireChannelRead(msg);
        }
        
        ...
    }
    

    就这样一直循环下去,依次会调用到 InboundHandlerA、InboundHandlerB、InboundHandlerC 中的 channelRead(ChannelHandlerContext ctx, Object msg) 接口。

    到最后一个TailContext节点,它对channelRead事件的处理如下:

    public class DefaultChannelPipeline implements ChannelPipeline {
            
        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
            
            ...
                    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                // 调用onUnhandledInboundMessage接口
                onUnhandledInboundMessage(msg);
            }      
    
            ...
    
        }
        
        ...
        
        // 对未处理inbound消息做最后的处理
        protected void onUnhandledInboundMessage(Object msg) {
            try {
                logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
            } finally {
                // 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存
                ReferenceCountUtil.release(msg);
            }
        }
        
        ...
       
    }
    

    以上就是pipeline对inBound消息的处理流程。

    SimpleChannelInboundHandler

    在前面的例子中,假如中间有一个ChannelHandler未对channelRead事件进行传播,就会导致消息对象无法得到释放,最终导致内存泄露。

    我们还可以继承 SimpleChannelInboundHandler 来自定义ChannelHandler,它的channelRead方法,对消息对象做了msg处理,防止内存泄露。

    public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
        ...
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean release = true;
            try {
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I imsg = (I) msg;
                    channelRead0(ctx, imsg);
                } else {
                    release = false;
                    ctx.fireChannelRead(msg);
                }
            } finally {
                if (autoRelease && release) {
                    // 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存
                    ReferenceCountUtil.release(msg);
                }
            }
        }
        
        ...
        
    }    
    

    outBound事件传播

    接下来,我们来分析Pipeline的outBound事件传播机制。代码示例如下:

    示例

    public class NettyPipelineOutboundExample {
    
        public static void main(String[] args) {
            EventLoopGroup group = new NioEventLoopGroup(1);
            ServerBootstrap strap = new ServerBootstrap();
            strap.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(8888))
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new OutboundHandlerA());
                            ch.pipeline().addLast(new OutboundHandlerB());
                            ch.pipeline().addLast(new OutboundHandlerC());
                        }
                    });
            try {
                ChannelFuture future = strap.bind().sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // 输出消息
            System.out.println("OutboundHandlerA: " + msg);
            // 传播write事件到下一个节点
            ctx.write(msg, promise);
        }
    }
    
    class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // 输出消息
            System.out.println("OutboundHandlerB: " + msg);
            // 传播write事件到下一个节点
            ctx.write(msg, promise);
        }
        
        
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            // 待handlerAdded事件触发3s后,模拟触发一个
            ctx.executor().schedule(() -> {
    //            ctx.write("Hello world ! ");
                ctx.channel().write("Hello world ! ");
            }, 3, TimeUnit.SECONDS);
        }
    }
    
    class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // 输出消息
            System.out.println("OutboundHandlerC: " + msg);
            // 传播write事件到下一个节点
            ctx.write(msg, promise);
        }
    }
    

    源码

    通过 telnet 来连接上面启动好的netty服务,触发channel added事件:

    $ telnet 127.0.0.1 8888
    

    按照OutboundHandlerA、OutboundHandlerB、OutboundHandlerC的添加顺序,控制台输出如下信息:

    OutboundHandlerC: Hello world ! 
    OutboundHandlerB: Hello world ! 
    OutboundHandlerA: Hello world ! 
    

    输出的顺序正好与ChannelHandler的添加顺序相反。

    若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:

    ...
    
    ch.pipeline().addLast(new InboundHandlerB());
    ch.pipeline().addLast(new InboundHandlerA());
    ch.pipeline().addLast(new InboundHandlerC());
    
    ...
    

    输出如下信息:

    OutboundHandlerC: Hello world ! 
    OutboundHandlerA: Hello world ! 
    OutboundHandlerB: Hello world ! 
    

    源码分析

    强烈建议 下面的流程,自己通过IDE的Debug模式来分析

    从channel的write方法开始,往下传播write事件:

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    
        ...
    
        @Override
        public ChannelFuture write(Object msg) {
            // 调用pipeline往下传播wirte事件
            return pipeline.write(msg);
        }
        
        ...
    
    }
    

    接着来看看Pipeline中的write接口:

    public class DefaultChannelPipeline implements ChannelPipeline {
        
        ...
        
        @Override
        public final ChannelFuture write(Object msg) {
            // 从tail节点开始传播
            return tail.write(msg);
        }
        
        ...    
    
    }
    

    调用ChannelHandlerContext中的write接口:

    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
            implements ChannelHandlerContext, ResourceLeakHint {
            
        ...
        
        @Override
        public ChannelFuture write(Object msg) {
            // 往下调用write接口
            return write(msg, newPromise());
        }
    
        @Override
        public ChannelFuture write(final Object msg, final ChannelPromise promise) {
            if (msg == null) {
                throw new NullPointerException("msg");
            }
    
            try {
                if (isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    // cancelled
                    return promise;
                }
            } catch (RuntimeException e) {
                ReferenceCountUtil.release(msg);
                throw e;
            }
            // 往下调用write接口
            write(msg, false, promise);
    
            return promise;
        }    
         
        ...
        
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            // 寻找下一个outbound类型的channelHandlerContext
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    // 调用接口 invokeWrite(Object msg, ChannelPromise promise)
                    next.invokeWrite(m, promise);
                }
            } else {
                AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                safeExecute(executor, task, promise, m);
            }
        }
        
        // 寻找下一个outbound类型的channelHandlerContext
        private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while (!ctx.outbound);
            return ctx;
        }
        
        private void invokeWrite(Object msg, ChannelPromise promise) {
            if (invokeHandler()) {
                // 继续往下调用
                invokeWrite0(msg, promise);
            } else {
                write(msg, promise);
            }
        }
        
        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                // 获取当前的channelHandler,调用其write接口
                // handler()依次会返回 OutboundHandlerC OutboundHandlerB OutboundHandlerA
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
        
        ...    
            
    }        
    

    最终会调用到HeadContext的write接口:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 调用unsafe进行写数据操作
        unsafe.write(msg, promise);
    }
    

    异常传播

    了解了Pipeline的入站与出站事件的机制之后,我们再来看看Pipeline的异常处理机制。

    示例

    public class NettyPipelineExceptionCaughtExample {
    
        public static void main(String[] args) {
            EventLoopGroup group = new NioEventLoopGroup(1);
            ServerBootstrap strap = new ServerBootstrap();
            strap.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(8888))
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new InboundHandlerA());
                            ch.pipeline().addLast(new InboundHandlerB());
                            ch.pipeline().addLast(new InboundHandlerC());
                            ch.pipeline().addLast(new OutboundHandlerA());
                            ch.pipeline().addLast(new OutboundHandlerB());
                            ch.pipeline().addLast(new OutboundHandlerC());
                        }
                    });
            try {
                ChannelFuture future = strap.bind().sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        static class InboundHandlerA extends ChannelInboundHandlerAdapter {
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                System.out.println("InboundHandlerA.exceptionCaught:" + cause.getMessage());
                ctx.fireExceptionCaught(cause);
            }
        }
    
        static class InboundHandlerB extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                throw new Exception("ERROR !!!");
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                System.out.println("InboundHandlerB.exceptionCaught:" + cause.getMessage());
                ctx.fireExceptionCaught(cause);
            }
        }
    
        static class InboundHandlerC extends ChannelInboundHandlerAdapter {
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                System.out.println("InboundHandlerC.exceptionCaught:" + cause.getMessage());
                ctx.fireExceptionCaught(cause);
            }
        }
    
    
        static class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                System.out.println("OutboundHandlerA.exceptionCaught:" + cause.getMessage());
                ctx.fireExceptionCaught(cause);
            }
    
        }
    
        static class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                System.out.println("OutboundHandlerB.exceptionCaught:" + cause.getMessage());
                ctx.fireExceptionCaught(cause);
            }
        }
    
        static class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                System.out.println("OutboundHandlerC.exceptionCaught:" + cause.getMessage());
                ctx.fireExceptionCaught(cause);
            }
        }
    
    }
    
    

    源码

    通过 telnet 来连接上面启动好的netty服务,并在控制台发送任意字符:

    $ telnet 127.0.0.1 8888
    

    触发channel read事件并抛出异常,控制台输出如下信息:

    InboundHandlerB.exceptionCaught:ERROR !!!
    InboundHandlerC.exceptionCaught:ERROR !!!
    OutboundHandlerA.exceptionCaught:ERROR !!!
    OutboundHandlerB.exceptionCaught:ERROR !!!
    OutboundHandlerC.exceptionCaught:ERROR !!!
    

    可以看到异常的捕获与我们添加的ChannelHandler顺序相同。

    源码分析

    在我们的示例中,InboundHandlerB的ChannelRead接口抛出异常,导致从InboundHandlerA将ChannelRead事件传播到InboundHandlerB的过程中出现异常,异常被捕获。

    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
            implements ChannelHandlerContext, ResourceLeakHint {
        
        ...
        
        @Override
        public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
            //调用invokeExceptionCaught接口
            invokeExceptionCaught(next, cause);
            return this;
        }
    
        static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
            ObjectUtil.checkNotNull(cause, "cause");
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                // 调用下一个节点的invokeExceptionCaught接口
                next.invokeExceptionCaught(cause);
            } else {
                try {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            next.invokeExceptionCaught(cause);
                        }
                    });
                } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to submit an exceptionCaught() event.", t);
                        logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                    }
                }
            }
        }
        
        ...
        
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    // 抛出异常
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    // 异常捕获,往下传播
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
        
        // 通知Handler发生异常事件
        private void notifyHandlerException(Throwable cause) {
            if (inExceptionCaught(cause)) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception was thrown by a user handler " +
                        "while handling an exceptionCaught event", cause);
                }
                return;
            }
            // 往下调用invokeExceptionCaught接口
            invokeExceptionCaught(cause);
        }
    
        
        private void invokeExceptionCaught(final Throwable cause) {
            if (invokeHandler()) {
                try {
                    // 调用当前ChannelHandler的exceptionCaught接口
                    // 在我们的案例中,依次会调用InboundHandlerB、InboundHandlerC、
                    // OutboundHandlerA、OutboundHandlerB、OutboundHandlC
                    handler().exceptionCaught(this, cause);
                } catch (Throwable error) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "An exception {}" +
                            "was thrown by a user handler's exceptionCaught() " +
                            "method while handling the following exception:",
                            ThrowableUtil.stackTraceToString(error), cause);
                    } else if (logger.isWarnEnabled()) {
                        logger.warn(
                            "An exception '{}' [enable DEBUG level for full stacktrace] " +
                            "was thrown by a user handler's exceptionCaught() " +
                            "method while handling the following exception:", error, cause);
                    }
                }
            } else {
                fireExceptionCaught(cause);
            }
        }
        
        ...
            
    }
    

    最终会调用到TailContext节点的exceptionCaught接口,如果我们中途没有对异常进行拦截处理,做会打印出一段警告信息!

    public class DefaultChannelPipeline implements ChannelPipeline {
    
        ...
        
        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
            
            ...
                
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                onUnhandledInboundException(cause);
            }
            
            ...
            
            protected void onUnhandledInboundException(Throwable cause) {
                try {
                    logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                        "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
                } finally {
                    ReferenceCountUtil.release(cause);
                }
            }
        }    
        
        ...
        
    }
    
    

    在实际的应用中,一般会定一个ChannelHandler,放置Pipeline末尾,专门用来处理中途出现的各种异常。

    最佳异常处理实践

    单独定义ExceptionCaughtHandler来处理异常:

    
    ...
       
    class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof Exception) {
                // TODO
                System.out.println("Successfully caught exception ! ");
            } else {
                // TODO
            }
        }
    }
    
    ...
        
    ch.pipeline().addLast(new ExceptionCaughtHandler());
    
    ...
    

    输出:

    InboundHandlerB.exceptionCaught:ERROR !!!
    InboundHandlerC.exceptionCaught:ERROR !!!
    OutboundHandlerA.exceptionCaught:ERROR !!!
    OutboundHandlerB.exceptionCaught:ERROR !!!
    OutboundHandlerC.exceptionCaught:ERROR !!!
    Successfully caught exception !  // 成功捕获日志
    

    Pipeline回顾与总结

    至此,我们对Pipeline的原理的解析就完成了。

    • Pipeline是在什么时候创建的?
    • Pipeline添加与删除节点的逻辑是怎么样的?
    • netty是如何判断ChannelHandler类型的?
    • 如何处理ChannelHandler中抛出的异常?
    • 对于ChannelHandler的添加应遵循什么样的顺序?

    参考资料

    相关文章

      网友评论

          本文标题:Netty Pipeline源码分析(2)

          本文链接:https://www.haomeiwen.com/subject/npnxxqtx.html