美文网首页
netty系列之(五)——ChannelPipeline与Cha

netty系列之(五)——ChannelPipeline与Cha

作者: 康康不遛猫 | 来源:发表于2018-12-20 19:25 被阅读0次

    一、pipeline初始化

    • Pipeline在创建Channel的时候被创建
      调用newChannelPipeline()方法创建Channel对应的Pipeline,创建tail节点和head节点通过prev/next组成双向链表
    • Pipeline节点数据结构:ChannelHandlerContext[存储自定义属性,Inbound/Outbound事件传播]
    • Pipeline中的两大哨兵:head和tail
      tail节点传播Inbound事件,用于异常未处理警告/Msg未处理建议处理收尾
      head节点传播Outbound事件,用于传播事件/读写事件委托Unsafe操作

    无论是服务端还是客户端Channel都会调用父类AbstractChannel构造方法

    protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    创建Pipeline时,创建head、tail两个ChannelHandlerContext,形成双向链表结构。


    ChannelPipeline
    protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    二、ChannelHandlerContext

    ChannelHandlerContext .png
    public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    
        /**
         * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
         */
        Channel channel();
    
        /**
         * Returns the {@link EventExecutor} which is used to execute an arbitrary task.
         */
        EventExecutor executor();
    
        /**
         * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
         * was added to the {@link ChannelPipeline}. This name can also be used to access the registered
         * {@link ChannelHandler} from the {@link ChannelPipeline}.
         */
        String name();
    
        /**
         * The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
         */
        ChannelHandler handler();//ChannelHandler 处理方法
    
        /**
         * Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
         * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
         * {@link EventLoop}.
         */
        boolean isRemoved();
    
        @Override
        ChannelHandlerContext fireChannelRegistered();
    
        @Override
        ChannelHandlerContext fireChannelUnregistered();
    
        @Override
        ChannelHandlerContext fireChannelActive();
    
        @Override
        ChannelHandlerContext fireChannelInactive();
    
        @Override
        ChannelHandlerContext fireExceptionCaught(Throwable cause);
    
        @Override
        ChannelHandlerContext fireUserEventTriggered(Object evt);
    
        @Override
        ChannelHandlerContext fireChannelRead(Object msg);
    
        @Override
        ChannelHandlerContext fireChannelReadComplete();
    
        @Override
        ChannelHandlerContext fireChannelWritabilityChanged();
    
        @Override
        ChannelHandlerContext read();
    
        @Override
        ChannelHandlerContext flush();
    
        /**
         * Return the assigned {@link ChannelPipeline}
         */
        ChannelPipeline pipeline();//当前pipeline
    
        /**
         * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
         */
        ByteBufAllocator alloc();
    
        /**
         * @deprecated Use {@link Channel#attr(AttributeKey)}
         */
        @Deprecated
        @Override
        <T> Attribute<T> attr(AttributeKey<T> key);
    
        /**
         * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
         */
        @Deprecated
        @Override
        <T> boolean hasAttr(AttributeKey<T> key);
    }
    
    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
            implements ChannelHandlerContext, ResourceLeakHint {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
        volatile AbstractChannelHandlerContext next;
        volatile AbstractChannelHandlerContext prev;
    ...
    }
    
    

    DefaultChannelHandlerContext

    final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    
        private final ChannelHandler handler;
    
        DefaultChannelHandlerContext(
                DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            this.handler = handler;
        }
    
        @Override
        public ChannelHandler handler() {
            return handler;
        }
    
        private static boolean isInbound(ChannelHandler handler) {
            return handler instanceof ChannelInboundHandler;//判断isInbound
        }
    
        private static boolean isOutbound(ChannelHandler handler) {
            return handler instanceof ChannelOutboundHandler;//判断isOutbound
        }
    }
    

    TailContext,主要做收尾的工作,如异常没处理或者消息没处理进行警告

    TailContext.png
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
            TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, TAIL_NAME, true, false);//inbound传true,outbound传false
                setAddComplete();//设置为已经添加
            }
    
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                // This may not be a configuration error and so don't log anything.
                // The event may be superfluous for the current pipeline configuration.
                ReferenceCountUtil.release(evt);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                onUnhandledInboundException(cause);
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                onUnhandledInboundMessage(msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
        }
    
    
    protected void onUnhandledInboundException(Throwable cause) {
            try {
                //捕获未处理的异常信息
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);//进行释放,防止内存泄漏
            }
        }
    
    protected void onUnhandledInboundMessage(Object msg) {
            try {
                //未处理的msg
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    

    HeadContext,主要进行事件传播、委托unsafe 进行读写操作。

    HeadContext.png
    final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, false, true);//inbound设置为false,outbound设置为true
                unsafe = pipeline.channel().unsafe();//进行读写操作
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                // NOOP
            }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                // NOOP
            }
    
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) throws Exception {
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                unsafe.disconnect(promise);
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                unsafe.close(promise);
            }
    
            @Override
            public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                unsafe.deregister(promise);
            }
    
            @Override
            public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) throws Exception {
                unsafe.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.fireExceptionCaught(cause);
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelUnregistered();
    
                // Remove all handlers sequentially if channel is closed and unregistered.
                if (!channel.isOpen()) {
                    destroy();
                }
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelActive();
    
                readIfIsAutoRead();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelInactive();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelReadComplete();
    
                readIfIsAutoRead();
            }
    
            private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {
                    channel.read();
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                ctx.fireUserEventTriggered(evt);
            }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelWritabilityChanged();
            }
        }
    

    三、添加ChannelHandler

    添加ChannelHandler->addLast():

    • 判断是否重复添加
      通过checkMultiplicity()方法强制转换ChannelHandler判断是否可共享&是否被添加过
    • 创建节点并添加至链表
      通过filterName()检查重复名称调用newContext()封装ChannelHandler创建节点
      调用addLast0()方法通过链表的方式添加到Channel的Pipeline
    • 回调添加完成事件
      调用callHandlerAdded0()方法添加ChannelInitializer执行handlerAdded()方法回调自定义的添加完成事件删除自身节点
    EventLoopGroup boss = new NioEventLoopGroup();//继承线程池ScheduledExecutorService
    EventLoopGroup worker = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(boss, worker);
    bootstrap.channel(NioServerSocketChannel.class);//利用反射构造NioServerSocketChannel实例
    bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    bootstrap.handler(new LoggingServerHandler());//handler与childHandler不同
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//ChannelInitializer,添加自定义hander
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new MyChannelHandler1());
            ch.pipeline().addLast(new MyChannelHandler2());
            ch.pipeline().addLast(new MyChannelHandler3());
        }
    });
    ChannelFuture f = bootstrap.bind(port).sync();//bind方法实现
    f.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    //启动成功
                }
    });
    f.channel().closeFuture().sync();
    

    addLast方法

    @Override
        public final ChannelPipeline addLast(ChannelHandler... handlers) {
            return addLast(null, handlers);
        }
    
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
            if (handlers == null) {
                throw new NullPointerException("handlers");
            }
    
            for (ChannelHandler h: handlers) {
                if (h == null) {
                    break;
                }
                addLast(executor, null, h);
            }
    
            return this;
        }
    
    @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);//判断是否被重复添加
    
                newCtx = newContext(group, filterName(name, handler), handler);//创建一个节点
    
                addLast0(newCtx);//添加到链表
    
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {//如果不在当前EventLoop中,则添加到任务队列,异步执行
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);//如果在当前EventLoop中,执行执行
            return this;
        }
    
    private static void checkMultiplicity(ChannelHandler handler) {
            if (handler instanceof ChannelHandlerAdapter) {
                ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
                if (!h.isSharable() && h.added) {//非共享(@Sharable注解)并且已经添加过,则抛出重复添加的异常
                    throw new ChannelPipelineException(
                            h.getClass().getName() +
                            " is not a @Sharable handler, so can't be added or removed multiple times.");
                }
                h.added = true;
            }
        }
    
    
    private String filterName(String name, ChannelHandler handler) {
            if (name == null) {
                return generateName(handler);
            }
            checkDuplicateName(name);//检查名字是否重复
            return name;
     }
    
    private void checkDuplicateName(String name) {
            if (context0(name) != null) {
                throw new IllegalArgumentException("Duplicate handler name: " + name);
            }
    }
    
    private AbstractChannelHandlerContext context0(String name) {
            AbstractChannelHandlerContext context = head.next;
            while (context != tail) {
                if (context.name().equals(name)) {
                    return context;
                }
                context = context.next;
            }
            return null;
        }
    
    //向双向链表中添加hander
    private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    
    //回调用户方法
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            try {
                ctx.handler().handlerAdded(ctx);//调用用户方法(服务端启动时ChannelInitializer中initChannel)
                ctx.setAddComplete();
            } catch (Throwable t) {
                boolean removed = false;
                try {
                    remove0(ctx);
                    try {
                        ctx.handler().handlerRemoved(ctx);
                    } finally {
                        ctx.setRemoved();
                    }
                    removed = true;
                } catch (Throwable t2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                    }
                }
    
                if (removed) {
                    fireExceptionCaught(new ChannelPipelineException(
                            ctx.handler().getClass().getName() +
                            ".handlerAdded() has thrown an exception; removed.", t));
                } else {
                    fireExceptionCaught(new ChannelPipelineException(
                            ctx.handler().getClass().getName() +
                            ".handlerAdded() has thrown an exception; also failed to remove.", t));
                }
            }
        }
    
    //ChannelInitializer中handlerAdded方法
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isRegistered()) {
                // This should always be true with our current DefaultChannelPipeline implementation.
                // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
                // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
                // will be added in the expected order.
                initChannel(ctx);
            }
        }
    
    //ChannelInitializer中initChannel方法
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
            if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
                try {
                    initChannel((C) ctx.channel());//执行initChannel
                } catch (Throwable cause) {
                    // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                    // We do so to prevent multiple calls to initChannel(...).
                    exceptionCaught(ctx, cause);
                } finally {
                    remove(ctx);//将自身节点删除
                }
                return true;
            }
            return false;
        }
    
    
    final void setAddComplete() {
            for (;;) {
                int oldState = handlerState;
                //自旋,compareAndSet设置handlerState
                if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                    return;
                }
            }
        }
    

    四、删除ChannelHander

    删除ChannelHander的使用场景:
    AuthHandler 权限控制demo

    public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception {
            if (paas(password)) {
                ctx.pipeline().remove(this);//通过密码验证删除自身,否则关闭连接
            } else {
                ctx.close();
            }
        }
    
        private boolean paas(ByteBuf password) {
            return false;
        }
    }
    

    删除ChannelHandler过程:

    • 找到节点
      通过getContextOrDie()方法使用context()方法从head的next遍历获取封装的ChannelHandlerContext节点
    • 链表的删除
      调用remove()方法删除节点使用remove0()方法获取当前节点的prev和next,prev的next置为当前节点的next,next的prev置为当前节点的prev
    • 回调删除Handler事件
      调用callHandlerRemoved0()方法获取当前节点的ChannelHandler使用handlerRemove()方法回调删除Handler事件
    #DefaultChannelPipeline类
    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
            remove(getContextOrDie(handler));
            return this;
     }
    
    #DefaultChannelPipeline类
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
            AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
            if (ctx == null) {
                throw new NoSuchElementException(handler.getClass().getName());
            } else {
                return ctx;
            }
        }
    
    #DefaultChannelPipeline类
    public final ChannelHandlerContext context(ChannelHandler handler) {
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            AbstractChannelHandlerContext ctx = head.next;
            for (;;) {
                if (ctx == null) {
                    return null;
                }
                if (ctx.handler() == handler) {//找到该hander节点
                    return ctx;
                }
                ctx = ctx.next;
            }
        }
    
    //删除方法
    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
            assert ctx != head && ctx != tail;//当前节点不是head也不是tail
    
            synchronized (this) {
                remove0(ctx);
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we remove the context from the pipeline and add a task that will call
                // ChannelHandler.handlerRemoved(...) once the channel is registered.
                if (!registered) {
                    callHandlerCallbackLater(ctx, false);
                    return ctx;
                }
    
                EventExecutor executor = ctx.executor();
                if (!executor.inEventLoop()) {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerRemoved0(ctx);
                        }
                    });
                    return ctx;
                }
            }
            callHandlerRemoved0(ctx);//回调删除hander方法
            return ctx;
        }
    
    //双向链表的删除
    private static void remove0(AbstractChannelHandlerContext ctx) {
            AbstractChannelHandlerContext prev = ctx.prev;
            AbstractChannelHandlerContext next = ctx.next;
            prev.next = next;
            next.prev = prev;
        }
    
    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
            // Notify the complete removal.
            try {
                try {
                    ctx.handler().handlerRemoved(ctx);//回调用户定义的删除ChannelHander事件
                } finally {
                    ctx.setRemoved();
                }
            } catch (Throwable t) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
            }
        }
    

    五、 inBound事件的传播

    • 何为inBound事件以及ChannelInboundHandler
      inBound事件包括Registered事件、Active事件、Read事件回调
      ChannelInboundHandler添加到Pipeline通过instanceof判断当前ChannelHandler类型设置inBound为true标识为ChannelInboundHandler处理inBound事件
    • ChannelRead事件的传播
      按照添加ChannelRead事件顺序正序[添加ChannelInboundHandler顺序正序]传播[从head节点/当前节点事件传播],最终传播到tail节点释放对象
    • SimpleInboundHandler处理器
      channelRead()方法调用ReferenceCountUtil的release()方法自动释放对象
    图片.png
    public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelRegistered();
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
         * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelWritabilityChanged();
        }
    
        /**
         * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
         * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
         *
         * Sub-classes may override this method to change behavior.
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.fireExceptionCaught(cause);
        }
    }
    

    测试demo

    public final class Server {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(new InBoundHandlerA());
                                ch.pipeline().addLast(new InBoundHandlerB());
                                ch.pipeline().addLast(new InBoundHandlerC());
                            }
                        });
    
                ChannelFuture f = b.bind(8888).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("InBoundHandlerB: " + msg);
            ctx.fireChannelRead(msg);//向后传播
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {//链接建立时
            ctx.channel().pipeline().fireChannelRead("hello world");//触发fireChannelRead
        }
    }
    

    ChannelInboundHandlerAdapter 执行顺序,按添加顺序


    InBoundHandler测试输出.png
    inBound事件传播.png
    inBound事件传播.png

    注意:
    直接调用pipeline().fireChannelRead("hello world");是从head节点往后传播;
    而ChannelInboundHandlerAdapter中channelRead调用ctx.fireChannelRead,是从当前节点往后传播。

    #AbstractChannelHandlerContext
    @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);//findContextInbound寻找next Inbound
            return this;
        }
    
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
    
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {//判断执行handler还是向后传递
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
    private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);//直到下一个inbound
            return ctx;
    }
    

    SimpleChannelInboundHandler
    在没有调用fireChannelRead继续往后传播直到tail节点时,需要手动释放资源。而SimpleChannelInboundHandler可以帮助自动释放。

    public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
        
       @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //
        }
    
        //在channelRead0中实现业务逻辑
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception {
            if (paas(password)) {
                ctx.pipeline().remove(this);
            } else {
                ctx.close();
            }
        }
    
        private boolean paas(ByteBuf password) {
            return false;
        }
    }
    

    SimpleChannelInboundHandler的channelRead方法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean release = true;
            try {
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I imsg = (I) msg;
                    channelRead0(ctx, imsg);
                } else {
                    release = false;
                    ctx.fireChannelRead(msg);
                }
            } finally {
                if (autoRelease && release) {
                    ReferenceCountUtil.release(msg);//释放资源
                }
            }
        }
    

    六、outBound事件的传播

    • 何为outBound事件以及ChannelOutboundHandler
      outBound事件,主要是服务端主动向客户端发起,如[发起事件]包括Bind事件[端口绑定]、Connect事件[连接/断连]、Close事件[关闭]、取消注册事件、Write事件以及Flush事件回调
      ChannelOutboundHandler添加到Pipeline通过instanceof判断当前ChannelHandler类型设置outBound为true标识为ChannelOutboundHandler处理outBound事件
    • write()事件的传播
      通过write()方法按照添加Write事件顺序进行倒序[添加ChannelOutboundHandler倒序倒序]传播,从tail节点事件传播[ChannelPipeline#write()]/从当前节点事件传播[ChannelHandlerContext#write()],最终传播到head节点调用Unsafe的write()方法
      outBound.png
    package io.netty.channel;
    
    import java.net.SocketAddress;
    
    /**
     * {@link ChannelHandler} which will get notified for IO-outbound-operations.
     */
    public interface ChannelOutboundHandler extends ChannelHandler {
        /**
         * Called once a bind operation is made.
         *
         * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
         * @param localAddress  the {@link SocketAddress} to which it should bound
         * @param promise       the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception    thrown if an error occurs
         */
        void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a connect operation is made.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
         * @param remoteAddress     the {@link SocketAddress} to which it should connect
         * @param localAddress      the {@link SocketAddress} which is used as source on connect
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void connect(
                ChannelHandlerContext ctx, SocketAddress remoteAddress,
                SocketAddress localAddress, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a disconnect operation is made.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a close operation is made.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a deregister operation is made from the current registered {@link EventLoop}.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
        /**
         * Intercepts {@link ChannelHandlerContext#read()}.
         */
        void read(ChannelHandlerContext ctx) throws Exception;
    
        /**
        * Called once a write operation is made. The write operation will write the messages through the
         * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
         * {@link Channel#flush()} is called
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
         * @param msg               the message to write
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
         * that are pending.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
         * @throws Exception        thrown if an error occurs
         */
        void flush(ChannelHandlerContext ctx) throws Exception;
    }
    
    

    write事件传播

    public final class Server {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(new OutBoundHandlerA());
                                ch.pipeline().addLast(new OutBoundHandlerB());
                                ch.pipeline().addLast(new OutBoundHandlerC());
                            }
                        });
    
                ChannelFuture f = b.bind(8888).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutBoundHandlerB: " + msg);
            ctx.write(msg, promise);//从当前节点往前传播
        }
    
    
        @Override
        public void handlerAdded(final ChannelHandlerContext ctx) {
            ctx.executor().schedule(() -> {//定时写数据,模拟给客户端的相应
                ctx.channel().write("hello world");//从tail节点往前传播
                //ctx.write("hello world");//从当前节点往前传播
            }, 3, TimeUnit.SECONDS);
        }
    }
    

    注意:
    ctx.channel().write("hello world");//从tail节点往前传播
    ctx.write("hello world");//从当前节点往前传播

    输出.png
    输出与添加hander的顺序相反。即outBound事件传播顺序和添加顺序相反。
    outBound事件传播.png
    outBound事件传播.png
    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
            return pipeline.write(msg, promise);
      }
    
    #AbstractChannelHandlerContext类
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
            if (msg == null) {
                throw new NullPointerException("msg");
            }
            try {
                if (isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    // cancelled
                    return promise;
                }
            } catch (RuntimeException e) {
                ReferenceCountUtil.release(msg);
                throw e;
            }
            write(msg, false, promise);
    
            return promise;
        }
    
    #AbstractChannelHandlerContext类
    private void write(Object msg, boolean flush, ChannelPromise promise) {
            AbstractChannelHandlerContext next = findContextOutbound();//向前寻找outbound
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                safeExecute(executor, task, promise, m);
            }
        }
    
    private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;//向前寻找outbound
            } while (!ctx.outbound);
            return ctx;
        }
    
    private void invokeWrite(Object msg, ChannelPromise promise) {
            if (invokeHandler()) {
                invokeWrite0(msg, promise);
            } else {
                write(msg, promise);
            }
        }
    
        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);//调用自定义hander的write方法
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    

    最终调用HeadContext的write,调用unsafe写数据给客户端

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
    {
                unsafe.write(msg, promise);
    }
    

    七、异常的传播

    • 异常的触发链
      调用channelRead()方法抛异常使用notifyHandlerException()方法发起Exception事件,通过fireExceptionCaught()方法按照添加事件顺序正序[添加ChannelHandler顺序正序]从当前节点触发传播到后置节点next,最终传播到tail节点调用onUnhandledInboundException()方法打印Pipeline未处理异常告警。
      只按添加顺序,与是Inbound节点还是OutBound节点无关。
    • 异常处理的最佳实践
      Pipeline添加ChannelHandler最后添加ExceptionCaughtHandler异常捕获处理器,所有异常都归异常捕获处理器按照异常类型处理
      demo
    package com.imooc.netty.ch6.exceptionspread;
    
    import com.sun.corba.se.impl.presentation.rmi.ExceptionHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.AttributeKey;
    
    /**
     * @author
     */
    public final class Server {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(new InBoundHandlerA());
                                ch.pipeline().addLast(new InBoundHandlerB());
                                ch.pipeline().addLast(new InBoundHandlerC());
                                ch.pipeline().addLast(new OutBoundHandlerA());
                                ch.pipeline().addLast(new OutBoundHandlerB());
                                ch.pipeline().addLast(new OutBoundHandlerC());
                                ch.pipeline().addLast(new ExceptionCaughtHandler());//异常处理器
                            }
                        });
    
                ChannelFuture f = b.bind(8888).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            throw new BusinessException("from InBoundHandlerB");//模拟异常触发
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("InBoundHandlerB.exceptionCaught()");
            ctx.fireExceptionCaught(cause);//传递异常
        }
    }
    
    public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("OutBoundHandlerA.exceptionCaught()");
            ctx.fireExceptionCaught(cause);//传递异常
        }
    }
    
    public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // ..
            if (cause instanceof BusinessException) {
                System.out.println("BusinessException");
            }
        }
    }
    
    输出.png

    按添加顺序,这里InBoundHandlerA不输出


    异常传播.png
    异常传播.png
    异常传播.png
    #AbstractChannelHandlerContext类
    private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
    private void notifyHandlerException(Throwable cause) {
            if (inExceptionCaught(cause)) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "An exception was thrown by a user handler " +
                                    "while handling an exceptionCaught event", cause);
                }
                return;
            }
            invokeExceptionCaught(cause);
        }
    
        private static boolean inExceptionCaught(Throwable cause) {
            do {
                StackTraceElement[] trace = cause.getStackTrace();
                if (trace != null) {
                    for (StackTraceElement t : trace) {
                        if (t == null) {
                            break;
                        }
                        if ("exceptionCaught".equals(t.getMethodName())) {
                            return true;
                        }
                    }
                }
                cause = cause.getCause();
            } while (cause != null);
            return false;
        }
    
    private void invokeExceptionCaught(final Throwable cause) {
            if (invokeHandler()) {
                try {
                    handler().exceptionCaught(this, cause);//调用ChannelHander中定义的exceptionCaught方法
                } catch (Throwable error) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "An exception {}" +
                            "was thrown by a user handler's exceptionCaught() " +
                            "method while handling the following exception:",
                            ThrowableUtil.stackTraceToString(error), cause);
                    } else if (logger.isWarnEnabled()) {
                        logger.warn(
                            "An exception '{}' [enable DEBUG level for full stacktrace] " +
                            "was thrown by a user handler's exceptionCaught() " +
                            "method while handling the following exception:", error, cause);
                    }
                }
            } else {
                fireExceptionCaught(cause);//
            }
        }
    
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
            invokeExceptionCaught(next, cause);
            return this;
        }
    
    //next,传播到下一个
    static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
            ObjectUtil.checkNotNull(cause, "cause");
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeExceptionCaught(cause);
            } else {
                try {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            next.invokeExceptionCaught(cause);
                        }
                    });
                } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to submit an exceptionCaught() event.", t);
                        logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                    }
                }
            }
        }
    
    
    //一直到tail节点
    protected void onUnhandledInboundException(Throwable cause) {
            try {
                //捕获未处理的异常信息
                logger.warn(
                        "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                                "It usually means the last handler in the pipeline did not handle the exception.",
                        cause);
            } finally {
                ReferenceCountUtil.release(cause);//进行释放,防止内存泄漏
            }
        }
    

    相关文章

      网友评论

          本文标题:netty系列之(五)——ChannelPipeline与Cha

          本文链接:https://www.haomeiwen.com/subject/tjhciftx.html