美文网首页IT@程序员猿媛
netty源码分析(18)- pipeline初始化

netty源码分析(18)- pipeline初始化

作者: Jorgezhong | 来源:发表于2019-02-27 10:07 被阅读69次

    本节开始学习pipeline相关的源码,回顾在前面学些的章节中,无论是服务端channel还是客户端channel在初始化的时候,都会创建其pipeline。因此初始化pipeline的入口在创建channed的过程中。

        protected AbstractChannel(Channel parent) {
            //parent为创建次客户端channel的服务端channel(服务端启动过程中通过反射创建的)
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            //初始化pipeline
            pipeline = newChannelPipeline();
        }
    

    初始化的时候将创建的channel传入,实例化了DefaultChannelPipeline

        protected DefaultChannelPipeline newChannelPipeline() {
            //this即channel
            return new DefaultChannelPipeline(this);
        }
    

    实例化DefaultChannelPipeline的过程主要有三个

    1. 保存channel
    2. 创建头节点(HeadContext)和尾节点(TailContext)
    3. 构建双向链表
        protected DefaultChannelPipeline(Channel channel) {
            //保存channel
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            //创建头节点、尾节点
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            //双向链表的结构
            head.next = tail;
            tail.prev = head;
        }
    
    • 数据节点ChannelHandlerContext
      继承了AttributeMap用于存储信息,实现了ChannelInboundInvokerChannelOutboundInvoker可进行事件的传播。
    public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
        //组件操作类方法,channel,executor(eventLoop),handler,pipeline和本身
        /**
         * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
         */
        Channel channel();
    
        /**
         * Returns the {@link EventExecutor} which is used to execute an arbitrary task.
         */
        EventExecutor executor();
    
        /**
         * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
         * was added to the {@link ChannelPipeline}. This name can also be used to access the registered
         * {@link ChannelHandler} from the {@link ChannelPipeline}.
         */
        String name();
    
        /**
         * The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
         */
        ChannelHandler handler();
    
        /**
         * Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
         * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
         * {@link EventLoop}.
         */
        boolean isRemoved();
    
        //实现传播类
    
        @Override
        ChannelHandlerContext fireChannelRegistered();
    
        @Override
        ChannelHandlerContext fireChannelUnregistered();
    
        @Override
        ChannelHandlerContext fireChannelActive();
    
        @Override
        ChannelHandlerContext fireChannelInactive();
    
        @Override
        ChannelHandlerContext fireExceptionCaught(Throwable cause);
    
        @Override
        ChannelHandlerContext fireUserEventTriggered(Object evt);
    
        @Override
        ChannelHandlerContext fireChannelRead(Object msg);
    
        @Override
        ChannelHandlerContext fireChannelReadComplete();
    
        @Override
        ChannelHandlerContext fireChannelWritabilityChanged();
    
        @Override
        ChannelHandlerContext read();
    
        @Override
        ChannelHandlerContext flush();
    
        /**
         * Return the assigned {@link ChannelPipeline}
         */
        ChannelPipeline pipeline();
        
        //内存分配方法
        /**
         * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
         */
        ByteBufAllocator alloc();
        
        //数据存储类方法
        /**
         * @deprecated Use {@link Channel#attr(AttributeKey)}
         */
        @Deprecated
        @Override
        <T> Attribute<T> attr(AttributeKey<T> key);
    
        /**
         * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
         */
        @Deprecated
        @Override
        <T> boolean hasAttr(AttributeKey<T> key);
    }
    

    该数据节点有一个抽象的实现AbstractChannelHandlerContext有几个重要的成员变量,实例化的时候会初始化以下除链表实现外的组件。

        //用于实现链表结构
        volatile AbstractChannelHandlerContext next;
        volatile AbstractChannelHandlerContext prev;
        //标识handler的类型
        private final boolean inbound;
        private final boolean outbound;
        //存储重要的组件
        private final DefaultChannelPipeline pipeline;
        final EventExecutor executor;
    
        AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                      boolean inbound, boolean outbound) {
            this.name = ObjectUtil.checkNotNull(name, "name");
            this.pipeline = pipeline;
            this.executor = executor;
            this.inbound = inbound;
            this.outbound = outbound;
            // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
            ordered = executor == null || executor instanceof OrderedEventExecutor;
        }
    
    • 尾节点DefaultChannelPipeline.TailContext
      TailContext是一个ChannelInboundHandler可处理入站事件,大部分是读事件。同时也是一个数据节点AbstractChannelHandlerContext,比较特殊。其传播方法大部分是空方法。
        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
            TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, TAIL_NAME, true, false);
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) { }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) { }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelActive();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelInactive();
            }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) {
                onUnhandledChannelWritabilityChanged();
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) { }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) { }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                onUnhandledInboundUserEventTriggered(evt);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                onUnhandledInboundException(cause);
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                onUnhandledInboundMessage(msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelReadComplete();
            }
        }
    

    跟进exceptionCaughtchannelRead的具体实现可以发现,就如果有msg(说明发生了读事件触发了channelRead)而前面的InboundHandler未做处理,触发了TailContext#channelRead(),则会打印debug日志。
    同理,异常处理exceptionCaught也是一样的做法。

        protected void onUnhandledInboundException(Throwable cause) {
            try {
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);
            }
        }
    
        protected void onUnhandledInboundMessage(Object msg) {
            try {
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
    • 头节点DefaultChannelPipeline.HeadContext
      该节点即是ChannelOutboundHandler也是ChannelInboundHandler,同时也是数据节点ChannelOutboundHandler
      持有unsafe做具体的读、写、连接、绑定端口等IO事件的操作。
      回忆之前学习的处理客户端channel的读事件中,最终都会走到这里的相关方法。
        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, true, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) {
                // NOOP
            }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) {
                // NOOP
            }
    
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) {
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.disconnect(promise);
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.close(promise);
            }
    
            @Override
            public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.deregister(promise);
            }
    
            @Override
            public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) {
                unsafe.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                ctx.fireExceptionCaught(cause);
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) {
                ctx.fireChannelUnregistered();
    
                // Remove all handlers sequentially if channel is closed and unregistered.
                if (!channel.isOpen()) {
                    destroy();
                }
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                ctx.fireChannelActive();
                //触发客户端channel选择NioEventLoop并向Selector注册读,轮询检测该事件
                readIfIsAutoRead();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                ctx.fireChannelInactive();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx.fireChannelRead(msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                ctx.fireChannelReadComplete();
    
                readIfIsAutoRead();
            }
    
            private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {
                    channel.read();
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                ctx.fireUserEventTriggered(evt);
            }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) {
                ctx.fireChannelWritabilityChanged();
            }
        }
    
    
    总结:pipeline维持了一个双向链表,里面存储的都是ChannelContext数据节点,也不难发现,数据节点拥有获取handler的方法,因此数据节点包含了具体的handler。并且初始化的头节点(HeadContext)是数据节点也同时是入站、出站处理器。而尾节点(TailContext)是数据节点的同事也是入站节点。

    相关文章

      网友评论

        本文标题:netty源码分析(18)- pipeline初始化

        本文链接:https://www.haomeiwen.com/subject/prqxuqtx.html