美文网首页
Netty源码六 Pipeline

Netty源码六 Pipeline

作者: 横渡 | 来源:发表于2019-08-01 08:07 被阅读0次

    1. pipeline概述

    pipeline可以说是Netty的大动脉,主要负责读写事件传播。我们首先提出以下问题:

    1. netty是如何判断channelHandler类型的?比如一个channelHandler是属于inbound类型还是outbound类型。
    2. 添加ChannelHandler时应该遵循什么样的顺序?
      inboud事件的传播,跟添加pipeline时的顺讯正相关;outbound事件的传播,跟添加pipeline的顺序逆相关。
    3. 用户手动触发传播事件,不同的触发方式有什么区别?比如channelHandlerContext.writeAndFlush与channelHandlerContext.channel.writeAndFlush有什么不同。

    本篇文章包含以下内容:

    • pipeline的初始化
    • 添加删除ChannelHandler(netty如何动态编织业务逻辑处理器)
    • 事件和异常传播(读写类事件如何在pipeline中传播,异常如何传播)

    2. pipeline的初始化

    本小节主要内容:

    • pipeline在创建Channel的时候被创建
    • Pipeline节点数据结构:ChannelHandlerContext
    • Pipeline中的两大哨兵:head和tail

    pipeline的创建

    不管是服务端channel还是服务端channel,在创建时都会调用AbstractChannel的构造函数,netty正是在该构造函数中创建pipeline的。由此也可以看出channel和pipeline是一一对应的关系

        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }
    

    然后来到DefaultChannelPipeline构造函数,这里是重点:

        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
          // 默认创建两个节点
            tail = new TailContext(this);
            head = new HeadContext(this);
    
          // 把这两个节点变成双向链表数据结构
            head.next = tail;
            tail.prev = head;
        }
    

    Pipeline节点数据结构:ChannelHandlerContext

    每个Pipeline中都维护着一个数据结构ChannelHandlerContext。我们来看它的接口声明:

    public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
        // 记录当前节点属于哪个channel
        Channel channel();
    
        // 记录与之相关的EventLoop会执行到这个节点
        EventExecutor executor();
    
        // 业务逻辑处理器名称
        String name();
    
        // 业务逻辑处理器
        ChannelHandler handler();
        
        // 移除当前节点
        boolean isRemoved();
    
        // 事件传播的方法
        @Override
        ChannelHandlerContext fireChannelRegistered();
    
        @Override
        ChannelHandlerContext fireChannelUnregistered();
    
        @Override
        ChannelHandlerContext fireChannelActive();
    
        @Override
        ChannelHandlerContext fireChannelInactive();
    
        @Override
        ChannelHandlerContext fireExceptionCaught(Throwable cause);
    
        @Override
        ChannelHandlerContext fireUserEventTriggered(Object evt);
    
        @Override
        ChannelHandlerContext fireChannelRead(Object msg);
    
        @Override
        ChannelHandlerContext fireChannelReadComplete();
    
        @Override
        ChannelHandlerContext fireChannelWritabilityChanged();
    
        @Override
        ChannelHandlerContext read();
    
        @Override
        ChannelHandlerContext flush();
    
        // 当前的pipeline
        ChannelPipeline pipeline();
    
        // 内存的分配器:当前节点上如果有数据读写,我要分配ByteBuf的时候,使用哪个内存分配器去分配
        ByteBufAllocator alloc();
    
        /**
          数据存储的能力
         * @deprecated Use {@link Channel#attr(AttributeKey)}
         */
        @Deprecated
        @Override
        <T> Attribute<T> attr(AttributeKey<T> key);
    
        /**
         * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
         */
        @Deprecated
        @Override
        <T> boolean hasAttr(AttributeKey<T> key);
    

    首先它继承自AttributeMap,表明它能存储自定义属性;ChannelInboundInvoker定义了inbound事件的传播接口,inboud事件包括读事件、注册事件、active事件,也就是说ChannelHandlerContext具有传播读事件的功能;ChannelOutboundInvoker有传播写事件、绑定端口、建立连接的功能。

    ChannelHandlerContext的默认实现是AbstractChannelHandlerContext。

    Pipeline的两大哨兵:head和tail

    来看TailContext,首先它继承自AbstractChannelHandlerContext,它也是一个节点;它实现了ChannelInboundHandler,说明它会传播Inbound事件:

        // A special catch-all handler that handles both bytes and messages.
        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
            TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, TAIL_NAME, true, false);
                // 设置节点为已经添加
                setAddComplete();
            }
            // 说明tail节点的业务逻辑处理器(handler)就是它自己
            public ChannelHandler handler() {
                return this;
            }
            
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                onUnhandledInboundException(cause);
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                onUnhandledInboundMessage(msg);
            }
          // ...
        }
    

    TailContext的构造函数调用了父类的构造函数:

        AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                      boolean inbound, boolean outbound) {
            this.name = ObjectUtil.checkNotNull(name, "name");
            this.pipeline = pipeline;
            this.executor = executor;
            // 记录是处理Inbound事件还是处理Outbound事件
            this.inbound = inbound;
            this.outbound = outbound;
            // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
            ordered = executor == null || executor instanceof OrderedEventExecutor;
        }
    

    我们来看异常处理exceptionCaught,如果异常没有处理,就会传播到tail里边,netty会提示你在最后一个业务逻辑处理器中进行捕获:

        protected void onUnhandledInboundException(Throwable cause) {
            try {
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);
            }
        }
    

    我们来看channelRead方法,如果一个inbound事件传进来了,但最终你没有处理,这个inbound事件就会传到TailContext中,netty会提示你检查下pipeline的配置。

        protected void onUnhandledInboundMessage(Object msg) {
            try {
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    

    由以上两个例子可以看出TailContext主要做一些收尾的事情。
    接下来我们来看HeadContext,它里边有个unsafe主要处理底层数据的读写。HeadContext是一个OutBound事件,跟我们的直觉刚好相反;我们的直觉是head是inboud,tail是outbound。

        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                // 它是一个outBound节点
                super(pipeline, null, HEAD_NAME, true, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
    
            // head节点对应的节点处理器(handler)也是它自己
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            // 连接刚建立成功,会调用该方法
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                ctx.fireChannelActive();
                // 默认情况下去注册一个读事件
                readIfIsAutoRead();
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                ctx.fireChannelReadComplete();
                // 跟channelActive一样,也是默认注册一个读事件
               // 后续selector会轮询到该读事件
                readIfIsAutoRead();
            }
            // ...
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.disconnect(promise);
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.close(promise);
            }
    
            @Override
            public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.deregister(promise);
            }
    
            @Override
            public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) {
                unsafe.flush();
            }
    }
    

    由上面的分析,可以看出HeadContext的作用是把outboud事件原模原样的进行向下传播,后面可以看到netty传播读写事件都是从head开始;再一个就是最终进行读写操作时,最终会委托给unsafe进行操作

    3. 添加ChannelHandler

    首先来看用户代码中如何添加ChannelHandler:

    ServerBootstrap bootstrap = new ServerBootstrap();
    // 配置启动辅助类
    bootstrap.group(bossGroup, workerGroup);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
        }
     });
    

    pipeline中addLast过程

    通过跟源码可以看到addLast的核心方法是DefaultChannelPipeline的addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法,该方法做的主要工作是:(1)判断handler是否重复添加;(2)创建handlerContext并添加至链表;(3)回调用户代码。

        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                // 判断hanlder是否重复
                checkMultiplicity(handler);
                
                // 创建handlerContext
                newCtx = newContext(group, filterName(name, handler), handler);
                // handlerContext添加到pipeline内部链表
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventLoop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    callHandlerAddedInEventLoop(newCtx, executor);
                    return this;
                }
            }
            // 回调用户代码
            callHandlerAdded0(newCtx);
            return this;
        }
    

    判断handler是否重复
    checkMultiplicity方法做这件事,handler非共享(@Sharable)并且被添加过就会抛出一个异常。
    @Sharable 表示这个handler可以共享,可以多次添加。

        private static void checkMultiplicity(ChannelHandler handler) {
            if (handler instanceof ChannelHandlerAdapter) {
                ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
                if (!h.isSharable() && h.added) {
                    throw new ChannelPipelineException(
                            h.getClass().getName() +
                            " is not a @Sharable handler, so can't be added or removed multiple times.");
                }
                // 如果校验通过,是否添加标志置为true
                h.added = true;
            }
        }
    

    isSharable() 方法是看添加的hanler有没有@Sharable注解。

    public boolean isSharable() {
            /**
             * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
             * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
             * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
             * {@link Thread}s are quite limited anyway.
             *
             * See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
             */
            Class<?> clazz = getClass();
            Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
            Boolean sharable = cache.get(clazz);
            if (sharable == null) {
                sharable = clazz.isAnnotationPresent(Sharable.class);
                cache.put(clazz, sharable);
            }
            return sharable;
        }
    

    创建节点并添加至链表

    // filterName的作用是给handler起名字,并确保名字不重复
    newCtx = newContext(group, filterName(name, handler), handler);
    

    filterName是给handler起名字,进入newContext方法:

        private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
        }
    

    这里是通过EventExecutorGroup,name,handler三个参数构建handlerContext--DefaultChannelHandlerContext。
    attLast0比较简单,就是节点入队列:

        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    

    回调添加完成事件
    callHandlerAdded0

    ChannelInitializer:handlerAdded --> initChannel--> 判断有没有执行过,没有执行过就initChannel。也就是说ChannelInitializer添加完成后会调用到用户代码 initChannel。

    4. 删除ChannelHandler

    最常见的场景就是权限的校验。
    删除ChannelHandler的过程:

    1. 找到节点
    2. 链表的删除
    3. 回调删除handler事件

    5. inBound事件的传播

    1. 何为inBound事件以及ChannelInboundHandler
      inBound事件(主要是事件触发机制,被动)主要包含registered,active,read这些事件。
      ChannelInboundHandler事件被添加到pipeline的时候,netty会创建handlerContext,这时候通过instanceof关键词来判断是否是inBound事件
      channelHandler接口继承关系
    2. ChannelRead事件的传播
      依次按照添加pipeline时的顺序,按顺序进行传播。通过pipeline传播一个read事件,会通过head节点往下传播;通过节点handlerContext调用fireChannelRead进行传播,那么它会从当前节点进行传播;最终如果传播的节点到达了tail节点,那么tail节点会释放对象。
    3. SimpleInboundHandler处理器
      我们继承channelRead0方法,它就会自动释放ByteBuf对象。

    6. outBound事件的传播

    1. 何为outBound事件以及ChannelOutBoundHandler
      outBound事件(主要是用户主动发起的操作)主要包含bind,connect,write,deregister。
    1. write()事件的传播

    如果调用channel的write方法【ctx.channel().write("hello world")】,会将write委托到pipeline,最终write事件会从pipeline的tail节点开始往后传播,最后传播到head节点,调用unsafe.write方法。
    如果调用某个节点的wirte方法【ctx.write("hello world")】,它会从当前节点往前传播。

    7. 异常的传播

    • 异常触发链
      异常的传播,跟handler是inBound节点还是outBound节点无关;只要handler中抛出的异常,就会从当前节点依次向下传播,直到tail节点,如果异常到达tail节点还没处理,tail节点会打印出提示信息。
    • 异常处理的最佳实践
      在pipeline链的最后添加一个ExceptionCaughtHandler

    相关文章

      网友评论

          本文标题:Netty源码六 Pipeline

          本文链接:https://www.haomeiwen.com/subject/bfsfdctx.html