美文网首页javaJava 杂谈
第十八节 netty源码分析之 pipleline和handl

第十八节 netty源码分析之 pipleline和handl

作者: 勃列日涅夫 | 来源:发表于2019-02-02 10:25 被阅读2次

    补充InBond事件

    通过在上一篇分析outbound事件已经两者的关系,在来分析inbound就比较简单了,作为outbound的镜像事件它是怎么运行的呢?
    Inbound 的特点是它传播方向是 head -> customContext -> tail
    方法中的代码片段中我们只分析了doConnect这个方法,而且也已经知道doConnect方法是最后是通过java nio中socket与服务端建立链接,那么再想想,链接成功后呢,是不是要通知客户端
    已经链接成功了呢,所以这里返回true表示链接成功,然后通过fulfillConnectPromise传递链接成功的事件,再结合官网的pipeline流转图我们推测这里应该是将连接成功事件绑定到inbound
    上进行传播,那么是不是这样的呢?我们结合代码来

    AbstractNioChannel中connect方法片段

        //doConnect这里的实现在子类NioSocketChannel中
                    if (doConnect(remoteAddress, localAddress)) {
                    //fulfillConnectPromise方法再链接后通知事件
                        fulfillConnectPromise(promise, wasActive);
                    } else {
    //略
                    }
    

    下面是AbstractNioChannel的fulfillConnectPromise具体如下,

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
                if (promise == null) {
                    // Closed via cancellation and the promise has been notified already.
                    return;
                }
    
                // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
                // We still need to ensure we call fireChannelActive() in this case.
                boolean active = isActive();
    
                // trySuccess() will return false if a user cancelled the connection attempt.
                boolean promiseSet = promise.trySuccess();
    
                // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
                // because what happened is what happened.
                //翻译九四无论连接尝试是否被取消,都应触发channelActive()事件,因为发生的事情就是发生了什么。
                if (!wasActive && active) {
                    pipeline().fireChannelActive();
                }
    
                // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
                if (!promiseSet) {
                    close(voidPromise());
                }
            }
    

    其中关键就在pipeline().fireChannelActive();这个方法中,

    继续fireChannelActive实现,根据前面的文章分析,再创建channel的时候会创建一个DefaultChannelPipeline,所以这里的fireChannelActive就在DefaultChannelPipeline中

    @Override
        public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }
    
    • 注意到invokeChannelActive的入参为head,所以从侧面也印证了inbound事件从head开始
      继续查看源码
     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelActive();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelActive();
                    }
                });
            }
        }
    

    这个方法很熟悉有木有,和之前分析outbound一样,不过这里的next就是head了。且invokeChannelActive的实现在AbstractChannelHandlerContext中

    private void invokeChannelActive() {
        //判断是否已添加haddler
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelActive(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelActive();
            }
        }
    
    • 这里就和之前outbound分析相同了,唯一不同就是获取的handler()方法获取返回的是head

    那么我们分析head的channelActive(其实else中的fireChannelActive也会涉及),我们从HeadContext找到该方法

      @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelActive();
    
                readIfIsAutoRead();
     }
    
    • 看到这ctx.fireChannelActive();这里fireChannelActive();和上面else里的是同一个均属于AbstractChannelHandlerContext类下(.channelActive(this)中的this入参就是AbstractChannelHandlerContext本身)
      所以分析AbstractChannelHandlerContext方法下的fireChannelActive就可以了
      @Override
        public ChannelHandlerContext fireChannelActive() {
            invokeChannelActive(findContextInbound());
            return this;
        }
    
    • 看到invokeChannelActive(findContextInbound());这个方法和之前outbound类似,不过这里区别还是有的,findContextInbound找到一个可用的inbound handler.这里就是
     private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }
    

    继续追踪会看执行会从.channelActive(this)->HeadContext的channelActive(ChannelHandlerContext ctx)->AbstractChannelHandlerContext的ChannelHandlerContext fireChannelActive()
    ->>AbstractChannelHandlerContext的invokeChannelActive(final AbstractChannelHandlerContext next)->再次回到AbstractChannelHandlerContext的 invokeChannelActive()
    总而言之就是inbound事件过来从headContext(inboud和outbound接口均实现)沿着inboud流向自定义inboundHandler依次执行channelActive通知。最后到TailContext(实现的是一个空方法channelActive)。

    所以我们写个inbound的handler并重写channelActive。当和服务端建立链接成功后,写入数据到服务端。下面的代码是不是就很清楚了

    public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    
        private final ByteBuf firstMessage;
    
        /**
         * Creates a client-side handler.
         */
        public EchoClientHandler() {
            firstMessage = Unpooled.buffer(EchoClient.SIZE);
            for (int i = 0; i < firstMessage.capacity(); i ++) {
                firstMessage.writeByte((byte) i);
            }
        }
    //当和服务端建立链接成功后,写入数据到pipeline(包含这ChannelHandlerContext的链表)
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(firstMessage);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.write(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
           ctx.flush();
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 如果你怀疑这个writeAndFlush是否传递的是outbound事件,其实可以追踪源码,会找到AbstractChannelHandlerContext next = findContextOutbound();最后是nio的write

    就此其他的事件同理可自行分析
    最后借用网友的总结两种事件:

    对于 Outbound事件:
    
        Outbound 事件是请求事件(由 Connect 发起一个请求, 并最终由 unsafe 处理这个请求)
    
        Outbound 事件的发起者是 Channel
    
        Outbound 事件的处理者是 unsafe
    
        Outbound 事件在 Pipeline 中的传输方向是 tail -> head.
    
        在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.xxx (例如 ctx.connect) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.
    
        Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
    
    对于 Inbound 事件:
    
        Inbound 事件是通知事件, 当某件事情已经就绪后, 通知上层.
    
        Inbound 事件发起者是 unsafe
    
        Inbound 事件的处理者是 Channel, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.
    
        Inbound 事件在 Pipeline 中传输方向是 head -> tail
    
        在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.
    
        Inbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT
    

    相关文章

      网友评论

        本文标题:第十八节 netty源码分析之 pipleline和handl

        本文链接:https://www.haomeiwen.com/subject/ritjjqtx.html