Netty ChannelHandler与ChannelPipe

作者: 良辰美景TT | 来源:发表于2018-07-24 09:30 被阅读8次

    ChannelHandler

      ChannelHandler基本上是我们第一次接触Netty就会碰到的对象,我们自定义的各种ChannelHandler主要用于处理我们系统的各种业务逻辑,比如发生了active事件后的处理逻辑,发生了读事件的处理逻辑,下面先来看一下ChannelHandler的类继承图:


    image.png

      ChannelHandler被分为两部分,分别为ChannelOutboundHandler与ChannelInboundHandler。其中ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用,ChannelOutboundHandler则提供了与网络I/O相关的方法。
    同时Netty也提供了相应的Adapter,主要是为了我们编码的方便,我们可以通过继承Adapter,这样ChannelHandler里便只需要关注需要重写的方法。而不是实现所有接口的方法。

    StringDecoder源码

      我们来关注一下StringDecoder这个类,StringDecoder用于对读入的数据根据指定的字符编码进行转换。StringDecoder继承MessageToMessageDecoder,而MessageToMessageDecoder继承ChannelInboundHandlerAdapter。StringDecoder便是一个典型的ChannelInboundHandler啦,先来看看MessageToMessageDecoder里都有那些内容,源码如下:

    public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
    
    //matcher用于检验是否对msg进行Decoder
        private final TypeParameterMatcher matcher;
    
        /**
         * Create a new instance which will try to detect the types to match out of the type parameter of the class.
         */
        protected MessageToMessageDecoder() {
            matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
        }
    
        /**
         * Create a new instance
         *
         * @param inboundMessageType    The type of messages to match and so decode
         */
        protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
            matcher = TypeParameterMatcher.get(inboundMessageType);
        }
    
        /**
         * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
         * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
         */
        public boolean acceptInboundMessage(Object msg) throws Exception {
            return matcher.match(msg);
        }
    
    //可以看出MessageToMessageDecoder只对 channelRead进行了重写,这就是Adapter提供的好处
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //这里的out是个list对象
            CodecOutputList out = CodecOutputList.newInstance();
            try {
    //acceptInboundMessage判断是否对msg进行解析
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I cast = (I) msg;
                    try {
    //这是个留给子类实现的方法啦, 也就是我们的StringDecoder里会实现的方法啦
                        decode(ctx, cast, out);
                    } finally {
                        ReferenceCountUtil.release(cast);
                    }
                } else {
                    out.add(msg);
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                int size = out.size();
    //对out里的对象触发fireChannelRead,让其它的channelhandler处理
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.getUnsafe(i));
                }
                out.recycle();
            }
        }
    
    
        protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
    }
    
    

    MessageToMessageDecoder方法只做了两件事:1:判断当前个对象是否需要调用decode方法,2:将decode结果的对象调用fireChannelRead方法交给其它的ChannelHandler处理。StringDecoder类里的方法就更简单了,源码如下:

    @Sharable
    public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
    
        // TODO Use CharsetDecoder instead.
    //传入字节码
        private final Charset charset;
    
        /**
         * Creates a new instance with the current system character set.
         */
        public StringDecoder() {
            this(Charset.defaultCharset());
        }
    
        /**
         * Creates a new instance with the specified character set.
         */
        public StringDecoder(Charset charset) {
            if (charset == null) {
                throw new NullPointerException("charset");
            }
            this.charset = charset;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    //这里对msg进行处理
            out.add(msg.toString(charset));
        }
    }
    

    StringEncoder源码

      我们再来关注一下StringEncoder的处理流程,StringEncoder用于对需要写的数据进行字符编码,StringEncoder继承自MessageToMessageEncoder,而MessageToMessageEncoder又继承ChannelOutboundHandlerAdapter。下面是MessageToMessageEncoder的源码:

    public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
    
        private final TypeParameterMatcher matcher;
    
        /**
         * Create a new instance which will try to detect the types to match out of the type parameter of the class.
         */
        protected MessageToMessageEncoder() {
            matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
        }
    
        /**
         * Create a new instance
         *
         * @param outboundMessageType   The type of messages to match and so encode
         */
        protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
            matcher = TypeParameterMatcher.get(outboundMessageType);
        }
    
        /**
         * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
         * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
         */
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return matcher.match(msg);
        }
    
    //只需要关注这个方法啦,这里会对面要写的数据进行encode
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            CodecOutputList out = null;
            try {
    //跟上面decode一样,需要验证msg能不能处理
                if (acceptOutboundMessage(msg)) {
                    out = CodecOutputList.newInstance();
                    @SuppressWarnings("unchecked")
                    I cast = (I) msg;
                    try {
    //具体的encode留给子类处理
                        encode(ctx, cast, out);
                    } finally {
                        ReferenceCountUtil.release(cast);
                    }
    
                    if (out.isEmpty()) {
                        out.recycle();
                        out = null;
                        throw new EncoderException(
                                StringUtil.simpleClassName(this) + " must produce at least one message.");
                    }
                } else {
                    ctx.write(msg, promise);
                }
            } catch (EncoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new EncoderException(t);
            } finally {
    //out不为空的话,就会调用ctx的witer方法触发写数据的逻辑啦
                if (out != null) {
                    final int sizeMinusOne = out.size() - 1;
                    if (sizeMinusOne == 0) {
                        ctx.write(out.get(0), promise);
                    } else if (sizeMinusOne > 0) {
                        // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                        // See https://github.com/netty/netty/issues/2525
                        ChannelPromise voidPromise = ctx.voidPromise();
                        boolean isVoidPromise = promise == voidPromise;
                        for (int i = 0; i < sizeMinusOne; i ++) {
                            ChannelPromise p;
                            if (isVoidPromise) {
                                p = voidPromise;
                            } else {
                                p = ctx.newPromise();
                            }
                            ctx.write(out.getUnsafe(i), p);
                        }
                        ctx.write(out.getUnsafe(sizeMinusOne), promise);
                    }
                    out.recycle();
                }
            }
        }
    
       
        protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
    }
    

      MessageToMessageEncoder类里也只做了三件事:1:判断当前的对象是否需要进行encoder。2:调用子类encoder方法对对象进行encoder。3:将encoder好了的对象调用发送逻辑。下面是StringEncoder源码:

    public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
    
        // TODO Use CharsetEncoder instead.
        private final Charset charset;
    
        /**
         * Creates a new instance with the current system character set.
         */
        public StringEncoder() {
            this(Charset.defaultCharset());
        }
    
        /**
         * Creates a new instance with the specified character set.
         */
        public StringEncoder(Charset charset) {
            if (charset == null) {
                throw new NullPointerException("charset");
            }
            this.charset = charset;
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
            if (msg.length() == 0) {
                return;
            }
    //根据 charset将String转成ByteBuf对象
            out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
        }
    }
    

    ChannelPipeline

      ChannelPipeline用于组织ChannelHandlerContext(内部含有ChannelHandler),在Netty里采用的是双端链表的方式来管理ChannelHandlerContext。在ChannelPipeline里提供了各种对双端链表处理的方法,同时也提供了各种触发ChannelHandlerContext的方法,比如:fireChannelActive方法,下面是部分源码:

    public class DefaultChannelPipeline implements ChannelPipeline {
    
    //双端链表的head对象
        final AbstractChannelHandlerContext head;
    //双端链表的tail对象
        final AbstractChannelHandlerContext tail;
    //持用的channel对象
        private final Channel channel;
    
        private final ChannelFuture succeededFuture;
        private final VoidChannelPromise voidPromise;
        private final boolean touch = ResourceLeakDetector.isEnabled();
    
        private Map<EventExecutorGroup, EventExecutor> childExecutors;
        private volatile MessageSizeEstimator.Handle estimatorHandle;
        private boolean firstRegistration = true;
    
    //的链表的未位增加一个ChannelHandler 
        public final ChannelPipeline addLast(ChannelHandler handler) {
            return addLast(null, handler);
        }
    
    //的链表的未位增加一个ChannelHandler ,需要传入这个ChannelHandler的名称
        @Override
        public final ChannelPipeline addLast(String name, ChannelHandler handler) {
            return addLast(null, name, handler);
        }
    
    //最终会调用到这个方法来对channelHandler处理
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
    //这是一个同步方法,需要锁住这个pipeline对象
            synchronized (this) {
    //参数合法性验证
                checkMultiplicity(handler);
    //这里会将ChannelHandler 包装成ChannelHandlerContext对象,这也就是为什么双端链表里存的是ChannelHandlerContext啦其中filterName会对为null的name生成一个名称
                newCtx = newContext(group, filterName(name, handler), handler);
    //这里才是具体处理链表的方法啦
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventloop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
    
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }
    //下面的方法是对链表进行操作的代码
        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    
    //pipeline里提供了类似fireChannelActive方法,这些方法最络会调用到channelHandler对应的方法上
        @Override
        public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }
    }
    

    ChannelHandlerContext

      ChannelHandlerContext对于连接ChannelHandler与ChannelPipeline。
    ChannelHandlerContext内部持有ChannelHandler对象,同时又是ChannelPipeline链表里的节点,串起了ChannelPipeline的整个逻辑,下面来看看ChannelHandlerContext最重要的类AbstractChannelHandlerContext源码:

    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
            implements ChannelHandlerContext, ResourceLeakHint {
    //当前ChannelHandlerContext指向的下一个ChannelHandlerContext
        volatile AbstractChannelHandlerContext next;
    //当前ChannelHandlerContext指向的前一个ChannelHandlerContext
        volatile AbstractChannelHandlerContext prev;
    //用于标识channelHanlder是否为inbound
        private final boolean inbound;
    //用于标识channelHanlder是否为outbound
        private final boolean outbound;
    //同时也持胡pipeline对象
        private final DefaultChannelPipeline pipeline;
    //channelHandler取的名称
        private final String name;
    //是否需要排序
        private final boolean ordered;
    
    //构造方法如下
        AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                      boolean inbound, boolean outbound) {
            this.name = ObjectUtil.checkNotNull(name, "name");
            this.pipeline = pipeline;
            this.executor = executor;
            this.inbound = inbound;
            this.outbound = outbound;
            // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
            ordered = executor == null || executor instanceof OrderedEventExecutor;
        }
    
    //这个方法是个static方法,用于给pipeline对象调用,
        static void invokeChannelActive(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
    //会触发ChannelHandlerContext的invokeChannelActive方法
                next.invokeChannelActive();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelActive();
                    }
                });
            }
        }
    
    //active的逻辑会调用到这个方法里
        private void invokeChannelActive() {
    //确认当前channelhandler的状态 
            if (invokeHandler()) {
                try {
    //最络会调用到channelhandler的channelActive方法,其中handler()方法是留给子类实现的可以看DefaultChannelHandlerContext源码部分
                    ((ChannelInboundHandler) handler()).channelActive(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelActive();
            }
        }
    
    }
    

    DefaultChannelHandlerContext源码就很简单了,提供了一个handler方法用于得到当前的ChannelHandler和判断当前ChannelHandler的类型。代码如下:

    package io.netty.channel;
    
    import io.netty.util.concurrent.EventExecutor;
    
    final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    
        private final ChannelHandler handler;
    
        DefaultChannelHandlerContext(
                DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            this.handler = handler;
        }
    
        @Override
        public ChannelHandler handler() {
            return handler;
        }
    
        private static boolean isInbound(ChannelHandler handler) {
            return handler instanceof ChannelInboundHandler;
        }
    
        private static boolean isOutbound(ChannelHandler handler) {
            return handler instanceof ChannelOutboundHandler;
        }
    }
    
    

    ChannelHandlerContext里作为ChannelPipeline的链表节点,决定着事件是否进行向下流转,如果想让事件向下流转,只需要通过ChannelHandlerContext调用相应的fire方法就行了

    相关文章

      网友评论

        本文标题:Netty ChannelHandler与ChannelPipe

        本文链接:https://www.haomeiwen.com/subject/qzjjmftx.html