美文网首页程序员
netty源码分析(23)- 异常的传播

netty源码分析(23)- 异常的传播

作者: Jorgezhong | 来源:发表于2019-03-01 14:57 被阅读0次

    上一节研究了outbound事件的传播过程,是和ChannelOutboundHandler的添加顺序相反的。从pipeline或者channel调用outbound事件的传播方法,是从TailContext开始传播,直到HeadContext,最终头节点使用unsafe处理具体的事件。

    本节学习异常的传播,添加了inboundoutbound处理器,其顺序是iA iB iC oA oB oC,并覆盖了exceptionCaught异常处理方法。在iB处理器中抛出了一个自定义异常,跟踪。

    class DataServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
    
            ch.pipeline().addLast(
                    new InboundHandlerA(),
                    new InboundHandlerB(),
                    new InboundHandlerC(),
                    new OutboundHandlerA(),
                    new OutboundHandlerB(),
                    new OutboundHandlerC()
            );
        }
    }
    
    
    class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("OutboundHandlerA.exceptionCaught()");
            ctx.fireExceptionCaught(cause);
        }
    }
    
    class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("OutboundHandlerB.exceptionCaught()");
            ctx.fireExceptionCaught(cause);
        }
    }
    
    class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("OutboundHandlerC.exceptionCaught()");
            ctx.fireExceptionCaught(cause);
        }
    }
    
    
    class InboundHandlerA extends ChannelInboundHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("InboundHandlerA.exceptionCaught()");
            ctx.fireExceptionCaught(cause);
        }
    }
    
    class InboundHandlerB extends ChannelInboundHandlerAdapter {
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            throw new BusinessException("form InboundHandlerB");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("InboundHandlerB.exceptionCaught()");
            ctx.fireExceptionCaught(cause);
        }
    }
    
    class InboundHandlerC extends ChannelInboundHandlerAdapter {
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("InboundHandlerC.exceptionCaught()");
            ctx.fireExceptionCaught(cause);
        }
    
    }
    

    从打印的信息可以猜测出,异常的传播似乎是顺序传播的。


    • 通过端点跟中捕获异常的地方,发现经过notifyHandlerException,invokeExceptionCaught最终handler().exceptionCaught(this, cause);调用了用户异常处理回调函数,交由用户本身进行处理。
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    //获取对应的handler并调用channelRead方法
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    //唤醒异常处理
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
    
        private void notifyHandlerException(Throwable cause) {
            if (inExceptionCaught(cause)) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "An exception was thrown by a user handler " +
                                    "while handling an exceptionCaught event", cause);
                }
                return;
            }
            //异常处理
            invokeExceptionCaught(cause);
        }
    
        private void invokeExceptionCaught(final Throwable cause) {
            if (invokeHandler()) {
                try {
                    //调用异常处理回调函数exceptionCaught
                    handler().exceptionCaught(this, cause);
                } catch (Throwable error) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "An exception {}" +
                            "was thrown by a user handler's exceptionCaught() " +
                            "method while handling the following exception:",
                            ThrowableUtil.stackTraceToString(error), cause);
                    } else if (logger.isWarnEnabled()) {
                        logger.warn(
                            "An exception '{}' [enable DEBUG level for full stacktrace] " +
                            "was thrown by a user handler's exceptionCaught() " +
                            "method while handling the following exception:", error, cause);
                    }
                }
            } else {
                fireExceptionCaught(cause);
            }
        }
    
    
    • 跟踪传播方法ctx.fireExceptionCaught(cause);,我们知道传播异常时通过fireExceptionCaught方法。发现直接待用了当前节点的下一个节点。接下来的过程和上述一致。
        @Override
        public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
            //直接使用链表中的下一个节点
            invokeExceptionCaught(next, cause);
            return this;
        }
    
        static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
            ObjectUtil.checkNotNull(cause, "cause");
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                //执行异常回调方法
                next.invokeExceptionCaught(cause);
            } else {
                try {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            next.invokeExceptionCaught(cause);
                        }
                    });
                } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to submit an exceptionCaught() event.", t);
                        logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                    }
                }
            }
        }
    

    补充:ctx.pipeline().fireExceptionCaught(cause);是从头节点开始传播的。

        ctx.pipeline().fireExceptionCaught(cause);
    
        @Override
        public final ChannelPipeline fireExceptionCaught(Throwable cause) {
            //从头节点开始传播
            AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
            return this;
        }
    
    • 最终异常如果一直传播下去的话则会到达尾节点,打出警告日志,同时释放内存。
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            onUnhandledInboundException(cause);
        }
    
    
        protected void onUnhandledInboundException(Throwable cause) {
            try {
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);
            }
        }
    
    • 异常处理建议
      根据异常处理的特点,在尾部增加一个异常处理器,统一处理所有的异常,处理完成之后并不往下进行传播
    class DataServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
    
            ch.pipeline().addLast(
                    new InboundHandlerA(),
                    new InboundHandlerB(),
                    new InboundHandlerC(),
                    new OutboundHandlerA(),
                    new OutboundHandlerB(),
                    new OutboundHandlerC(),
                    new ExceptionCaughtHandler()
            );
            
        }
    }
    
    
    class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter{
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof BusinessException) {
                System.out.println("is BusinessException");
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:netty源码分析(23)- 异常的传播

        本文链接:https://www.haomeiwen.com/subject/kzdduqtx.html