美文网首页Java 杂谈java
第十五节 netty源码分析之客户端源码分析02

第十五节 netty源码分析之客户端源码分析02

作者: 勃列日涅夫 | 来源:发表于2019-01-28 16:32 被阅读0次

    handler调用

    我们已经知道ChannelInitializer的添加过程其实,是会创建一个DefaultChannelHandlerContext然后添加到ChannelHandlerContext双线链表tail的前面
    而我们自定义的handler就在ChannelInitializer的initChannel方法中,那么initChannel何时调用将我们的handler添加到链表中( p.addLast),下面我们开始分析
    自定义 ChannelHandler 的添加过程, 发生在 AbstractUnsafe.register0 中, 在这个方法中调用了 pipeline.fireChannelRegistered()

    • 首先继续分析之前AbstractBootstrap的initAndRegister() 方法中的方法 init(channel); init的实现再子类bootstrip中
    @Override
        @SuppressWarnings("unchecked")
        void init(Channel channel) throws Exception {
            ChannelPipeline p = channel.pipeline();
            // config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
            p.addLast(config.handler());
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                //设置channel类型
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
            }
        }
    

    继续查看addLast方法

     @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
            //检查是否重复添加
                checkMultiplicity(handler);
            //创建DefaultChannelHandlerContext对象包装handler,注意构造参数中
                newCtx = newContext(group, filterName(name, handler), handler);
            //将生成的newCtx插入handlercontex链表中
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }
    

    执行完后pipeline中已包含handler ChannelInitializer

    接着根据initAndRegister方法中的ChannelFuture regFuture = config().group().register(channel);//注册
    根据上一节的分析:register—>SingleThreadEventLoop#register->AbstractChannel #register->AbstractChannel #register0->AbstractChannel #register0#pipeline.fireChannelRegistered();
    fireChannelRegistered方法源码

     @Override
        public final ChannelPipeline fireChannelRegistered() {
            AbstractChannelHandlerContext.invokeChannelRegistered(head);
            return this;
        }
    

    附上代码

     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
            // 由上面的方法此处调用head.invokeChannelRegistered() 方法
                next.invokeChannelRegistered();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRegistered();
                    }
                });
            }
        }
    

    查看invokeChannelRegistered方法

     private void invokeChannelRegistered() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRegistered(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRegistered();
            }
        }
    

    上面的方法handler()其实是fireChannelRegistered方法入参head,在head.fireChannelRegistered 其实是调用的 AbstractChannelHandlerContext.fireChannelRegistered
    在head中channelRegistered方法中

     @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }
    
    

    channelRegistered

      @Override
       public ChannelHandlerContext fireChannelRegistered() {
           invokeChannelRegistered(findContextInbound());
           return this;
       }
    

    findContextInbound方法:

    private AbstractChannelHandlerContext findContextInbound() {
      AbstractChannelHandlerContext ctx = this;
      do {
          ctx = ctx.next;
      } while (!ctx.inbound);
      return ctx;
    }
    

    从 head 开始遍历 Pipeline 的双向链表, 然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例. 接着掉ChannelInitializer 的channelRegistered方法

       public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
            // the handler.
            if (initChannel(ctx)) {
                // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
                // miss an event.
                ctx.pipeline().fireChannelRegistered();
    
                // We are done with init the Channel, removing all the state for the Channel now.
                removeState(ctx);
            } else {
                // Called initChannel(...) before which is the expected behavior, so just forward the event.
                ctx.fireChannelRegistered();
            }
        }
    
    和之前分析一致,initChannel  它就是我们在初始化 Bootstrap 时, 调用 handler 方法传入的匿名内部类所实现的方法:
    
       xxx.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new EchoClientHandler());
        }
    });
    
    因此当调用了这个方法后, 我们自定义的 ChannelHandler 就插入到 Pipeline 了, 接着removeState(ctx);移除ChannelInitializer这个handler。自定义handler就添加到了pipeline中

    相关文章

      网友评论

        本文标题:第十五节 netty源码分析之客户端源码分析02

        本文链接:https://www.haomeiwen.com/subject/wnthjqtx.html