Pipeline

作者: Pillar_Zhong | 来源:发表于2019-08-01 16:54 被阅读0次

    ChannelPipeline

    1563778578642.png 1563778661420.png

    首先一个pipeline对应一个channel,从方法列表上能看出,基本上pipeline的主要功能是维护其自身的双向链表的一个责任链。其二,是用来对外提供IO事件或IO处理,其内部最终还是会去调用节点的ChannelHandler,相当于就是handler的一个代理。

    1563777726937.png

    初始化

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        // 双向链表,有头有尾
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        head.next = tail;
        tail.prev = head;
    }
    

    ChannelHandlerContext

    1563777639800.png

    ChannelSocket 和 ChannelPipeline 是一对一的关联关系,而 pipeline 内部的多个 Context 形成了链表,Context 只是对 Handler 的封装。

    1563779627805.png

    TailContext

    从tailcontext的继承体系来看,他只是作为入站的处理器来对待,作为pipeline的最后一环,来对IO事件做收尾工作,比如捕获异常啦,提醒有消息没有处理被丢弃什么的。

    1563779342046.png
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
        TailContext(DefaultChannelPipeline pipeline) {
            // inbound为true,outbound为false,说明tailContext主要用来处理READ,入站处理器
            super(pipeline, null, TAIL_NAME, true, false);
            // 设置该节点已经处理完毕
            setAddComplete();
        }
    
        // 说明TailContext不光是个标记位,同时也是pipeline的处理节点
        @Override
        public ChannelHandler handler() {
            return this;
        }
    
        ...
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 当读取过程中有异常是,进行捕获
            // An exceptionCaught() event was fired, and it reached at the tail of the               // pipeline. It usually means the last handler in the pipeline did not handle the         // exception."
            onUnhandledInboundException(cause);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 如果有消息进来,调用到这里,说明你需要检查你的pipeline配置,因为没有对应的节点来处理读取事件
            // 调用到这里,说明该消息即将会被丢弃掉
            // Discarded inbound message {} that reached at the tail of the pipeline. " +
            // "Please check your pipeline configuration.
            onUnhandledInboundMessage(msg);
        }
    
    }
    

    HeadContext

    而HeadContext的继承体系看起来明显不同,更全面,负责的工作更多,基本上是全才,入站出战全搞定,不难理解。这里是pipeline的起始点,当然不一样。比如读写,连接,bind什么的,必定都在tail前都执行完了,当然不用流转到tail了。其二,还有一个很重要的职责是负责将收到的IO事件不条件的往下传播。

    1563779399734.png
    // 而HeadContext的工作则繁重得多,不管是读写,连接,绑定等等,还要负责将事件向tail的方向进行传播
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
    
        private final Unsafe unsafe;
    
        HeadContext(DefaultChannelPipeline pipeline) {
            // inbound为false,outbound为true,说明HeadContext主要用来处理write,出站处理器
            super(pipeline, null, HEAD_NAME, false, true);
            // 而handler的工作最终是需要unsafe来托管
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
    
        ...
    }
    

    ChannelHandler

    而pipeline上真正对事件进行处理的就是ChannelHandler,可以看到ChannelInboundHandler 主要是处理IO的事件的,而ChannelOutboundHandler主要处理

    1563779797126.png

    添加

    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        
        // 首先需要注意的是,addLast一次可以加入多个handler
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
    
        return this;
    }
    
    
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 判断重复添加
            checkMultiplicity(handler);
            // 创建context节点,主要是绑定ctx和handler
            newCtx = newContext(group, filterName(name, handler), handler);
    
            addLast0(newCtx);
    
            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
    
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    

    判断重复添加

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            // 如果你向一个handler实例要多个channel共享,那么你需要标注Sharable。
            // 否则,如果已经添加到了一个channel,不能再重复添加到其他channel
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    

    创建节点

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        // 这里会判断当前的handler是inbound还是outbound
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    
    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }
    
    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
    

    添加至列表

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    

    回调完成事件

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // 而在初始化的时候,这里会调用到ChannelInitializer的handlerAdded方法
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
            ...
        }
    }
    
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // 而最终会调回到用户定义的initChannel方法,对pipeline进行初始化
            initChannel(ctx);
        }
    }
    
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                // 而最终会调回到用户定义的initChannel方法,对pipeline进行初始化
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
              ...
            } finally {
                // 而上面执行完pipeline配置后,会移除ChannelInitializer,毕竟配置设置一次就好。
                remove(ctx);
            }
            return true;
        }
        return false;
    }
    
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 配置pipeline
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler());
    }
    

    删除

    场景

    假如需要在首次连接的时候做一次权限校验,通过,则以后畅通无阻,失败,则寸步难行。那么很容易想到,我们可以add一个AuthHandler,然后通过后,再remove这个handler。免得每次都需要做权限验证。

    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }
    

    查找节点

    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        // 去ctx查找handler,如果找不到报NoSuchElementException
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    
    public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
    
        // 从head开始往后查找传入的handler
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {
    
            if (ctx == null) {
                return null;
            }
    
            if (ctx.handler() == handler) {
                return ctx;
            }
    
            ctx = ctx.next;
        }
    }
    

    链表删除

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        // head和tail不能删除
        assert ctx != head && ctx != tail;
    
        synchronized (this) {
            remove0(ctx);
    
            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we remove the context from the pipeline and add a task that will call
            // ChannelHandler.handlerRemoved(...) once the channel is registered.
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }
    
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }
    
    // 链表删除
    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
    

    回调删除事件

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            try {
                // 回调该handler的handlerRemoved方法
                ctx.handler().handlerRemoved(ctx);
            } finally {
                // 设置该handler绑定的ctx的状态为REMOVE_COMPLETE
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            // 如果有异常,传递ExceptionCaught事件
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
    

    inBound事件传播

    img

    head节点

    public final ChannelPipeline fireChannelRead(Object msg) {
        // 从head开始,往后传播,调用head的next的channelRead
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    

    当前节点

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // 调用下一个节点的channelRead
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    
    // 获取当前节点的下一个节点
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
            // 传播的下一个handler必须是inbound
        } while (!ctx.inbound);
        return ctx;
    }
    

    tail节点

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果前面消息传播过程中没人节点处理完消息并返回,那么这里提醒下该消息会被丢弃
        onUnhandledInboundMessage(msg);
    }
    

    节点筛选

    截获事件并往下传播的过程需要用户自己来触发,并不会自动往下传播,那么inBound事件需要筛选出下一个是否是inBound流程节点,怎么判断呢?其实在前面创建节点的部分,已经提前做好了判断。

    SimpleChannelInboundHandler

    1563787372972.png

    首先,使用这个的好处是处理message的时候可以直接使用你指定的类型。而不用转来转去。另外它可以自动释放你在pipeline中传递的资源。比如ByteBuf。

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            // 这里主要是验证msg的类型是否是你指定的泛型类型
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                // 强转
                I imsg = (I) msg;
                // 调用用户自定义的channelRead0,并会将message传进去,你可以选择是否要释放资源
                channelRead0(ctx, imsg);
            } else {
                // 如果不是指定的类型,直接传播给下一个handler
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            // 最终会释放掉message持有的资源
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
    

    outBound事件传播

    img

    tail节点

    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }
    

    当前节点

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 找到当前节点的prev前一个节点,往前传播
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
    

    head节点

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 最终由unsafe来托管进行write,且不再继续传播事件
        unsafe.write(msg, promise);
    }
    

    节点筛选

    // 这里说明直接调用ctx,也就是节点的write的时候,会找当前节点的prev进行传播
    // 而调用ctx.channel()的write,会直接从tail开始找
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
            // 必须是outbound类型的节点,跟inbound同理
        } while (!ctx.outbound);
        return ctx;
    }
    

    异常事件传播

    触发

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                // 当处理channelRead的时候报错,会触发异常事件传播
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    
    private void notifyHandlerException(Throwable cause) {
        // 是否用户在自定义的handler中处理异常失败,被再次抛出?
        // 那么说明该异常事件不可控
        if (inExceptionCaught(cause)) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "An exception was thrown by a user handler " +
                                "while handling an exceptionCaught event", cause);
            }
            return;
        }
        
        invokeExceptionCaught(cause);
    }
    

    当前节点

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                // 调用当前handler的exceptionCaught方法
                handler().exceptionCaught方法(this, cause);
            } catch (Throwable error) {
                ...
            }
        } else {
            fireExceptionCaught(cause);
        }
    }
    
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        // 异常事件并不像inbound或outbound一样去从head或从tail开始,按照各自的顺序去查找同类的节点去触发
        // 异常只会按照节点add的时候的顺序来往下传播,并不case是inbound还是outbound
        invokeExceptionCaught(next, cause);
        return this;
    }
    
    static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
        ObjectUtil.checkNotNull(cause, "cause");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 调用handler的exceptionCaught方法
            next.invokeExceptionCaught(cause);
        } else {
            try {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeExceptionCaught(cause);
                    }
                });
            } catch (Throwable t) {
               ...
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Pipeline

          本文链接:https://www.haomeiwen.com/subject/dgdcdctx.html