美文网首页
【Netty】ChannelPipeline和ChannelHa

【Netty】ChannelPipeline和ChannelHa

作者: xbmchina | 来源:发表于2019-06-13 23:51 被阅读0次

    欢迎关注公众号:【爱编码
    如果有需要后台回复2019赠送1T的学习资料哦!!

    简介

    前文学习Netty的ByteBuf数据容器。本文开始学习ChannelPipeline和ChannelHandler,它们的角色非常类似于流水线以及工人。

    Channel的生命周期

    Channel接口定义了一个简单但是强大的状态模型,该模型与ChannelInboundHandler API紧密联系。

    ChannelInboundHandler源码如下

    public interface ChannelInboundHandler extends ChannelHandler {
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
         */
        void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
         */
        void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} is now active
         */
        void channelActive(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
         * end of lifetime.
         */
        void channelInactive(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Invoked when the current {@link Channel} has read a message from the peer.
         */
        void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    
        /**
         * Invoked when the last message read by the current read operation has been consumed by
         * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
         * attempt to read an inbound data from the current {@link Channel} will be made until
         * {@link ChannelHandlerContext#read()} is called.
         */
        void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Gets called if an user event was triggered.
         */
        void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    
        /**
         * Gets called once the writable state of a {@link Channel} changed. You can check the state with
         * {@link Channel#isWritable()}.
         */
        void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Gets called if a {@link Throwable} was thrown.
         */
        @Override
        @SuppressWarnings("deprecation")
        void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    }
    
    

    其中标记Channel生命周期状态的方法如下:

    以下列出Channel的4个状态:

    ChannelUnregistered:Channel已经被创建,但还未注册到EventLoop
    ChannelRegistered:Channel已经被注册到了EventLoop
    ChannelActive:Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
    ChannelInactive:Channel没有连接到远程节点

    Channel的正常生命周期如下图所示

    当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。

    ChannelHandler的生命周期

    ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些操作,这些方法中的每一个都接受一个ChannelHandlerContext参数。

    ChannelHandler源码如下:

    public interface ChannelHandler {
    
        /**
         * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
         */
        void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
         * anymore.
         */
        void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Gets called if a {@link Throwable} was thrown.
         *
         * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
         * implement the method there.
         */
        @Deprecated
        void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    
        /**
         * Indicates that the same instance of the annotated {@link ChannelHandler}
         * can be added to one or more {@link ChannelPipeline}s multiple times
         * without a race condition.
         * <p>
         * If this annotation is not specified, you have to create a new handler
         * instance every time you add it to a pipeline because it has unshared
         * state such as member variables.
         * <p>
         * This annotation is provided for documentation purpose, just like
         * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
         */
        @Inherited
        @Documented
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
        @interface Sharable {
            // no value
        }
    }
    
    

    ChannelHandler的生命周期方法解析:

    handlerAdded:当把ChannelHandler添加到ChannelPipeline中时被调用
    handlerRemoved:当从ChannelPipeline中移除ChannelHandler时被调用
    exceptionCaught:当处理过程中在ChannelPipeline中有错误产生时被调用

    Netty定义了下面两个重要的ChannelHandler子接口:

    ChannelInboundHandler——处理入站数据以及各种状态变化
    ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作

    ChannelHandler之ChannelInboundHandler

    ChannelInboundHandler接口生命周期中的方法,当接受到数据或者其对应的Channel的状态发生变化则会调用方法

    ChannelInboundHandler源码如下:

    public interface ChannelInboundHandler extends ChannelHandler {
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
         */
        void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
         */
        void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} is now active
         */
        void channelActive(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
         * end of lifetime.
         */
        void channelInactive(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Invoked when the current {@link Channel} has read a message from the peer.
         */
        void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    
        /**
         * Invoked when the last message read by the current read operation has been consumed by
         * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
         * attempt to read an inbound data from the current {@link Channel} will be made until
         * {@link ChannelHandlerContext#read()} is called.
         */
        void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Gets called if an user event was triggered.
         */
        void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    
        /**
         * Gets called once the writable state of a {@link Channel} changed. You can check the state with
         * {@link Channel#isWritable()}.
         */
        void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    
        /**
         * Gets called if a {@link Throwable} was thrown.
         */
        @Override
        @SuppressWarnings("deprecation")
        void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    }
    
    

    当某个ChannelInboundHandler的实现重写channelRead()方法时,它将负责显式地释放与池化ByteBuf实例相关的内存,Netty为此提供了一个实用方法ReferenceCountUtil.release()

    @ChannelHandler.Sharable
    //扩展了ChannelInboundHandlerAdapter
    public class DiscardHandler extends ChannelInboundHandlerAdapter{
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //丢弃已接收的消息
            ReferenceCountUtil.release(msg);
        }
    }
    

    当你不想处理这些释放资源等操作的话,Netty提供了SimpleChannelInboundHandler会自动释放资源,因此无需显式释放,代码如下:

    public class MySimpleHandler
        extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
            Object msg) {
                // No need to do anything special
        }
    }
    

    为了更好地理解,我们看看SimpleChannelInboundHandler的源码,从中可以看到,它已经帮我们释放资源 了的,我们只需要实现channelRead0方法,在channelRead0()处理我们的业务逻辑即可。

    public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
    //省略了很多代码。。。。。。
      @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            boolean release = true;
            try {
                if (acceptInboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I imsg = (I) msg;
                    channelRead0(ctx, imsg);
                } else {
                    release = false;
                    ctx.fireChannelRead(msg);
                }
            } finally {
                if (autoRelease && release) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    //省略了很多代码。。。。。。
    
        protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
    
    }
    

    ChannelHandler之ChannelOutboundHandler

    出站操作和数据将由ChannelOutboundHandler处理,它的方法将被ChannelChannelPipeline以及ChannelHandlerContext调用。

    ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷并在稍后继续。

    ChannelPromise与ChannelFuture :
    ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()setFailure(),从而使ChannelFuture不可变。

    ChannelOutboundHandler源码如下:

    
    /**
     * {@link ChannelHandler} which will get notified for IO-outbound-operations.
     */
    public interface ChannelOutboundHandler extends ChannelHandler {
        /**
         * Called once a bind operation is made.
         *
         * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
         * @param localAddress  the {@link SocketAddress} to which it should bound
         * @param promise       the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception    thrown if an error occurs
         */
        void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a connect operation is made.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
         * @param remoteAddress     the {@link SocketAddress} to which it should connect
         * @param localAddress      the {@link SocketAddress} which is used as source on connect
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void connect(
                ChannelHandlerContext ctx, SocketAddress remoteAddress,
                SocketAddress localAddress, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a disconnect operation is made.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a close operation is made.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a deregister operation is made from the current registered {@link EventLoop}.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
        /**
         * Intercepts {@link ChannelHandlerContext#read()}.
         */
        void read(ChannelHandlerContext ctx) throws Exception;
    
        /**
        * Called once a write operation is made. The write operation will write the messages through the
         * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
         * {@link Channel#flush()} is called
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
         * @param msg               the message to write
         * @param promise           the {@link ChannelPromise} to notify once the operation completes
         * @throws Exception        thrown if an error occurs
         */
        void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    
        /**
         * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
         * that are pending.
         *
         * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
         * @throws Exception        thrown if an error occurs
         */
        void flush(ChannelHandlerContext ctx) throws Exception;
    }
    

    ChannelPipeline接口

    如果将ChannelPipeline视为ChannelHandler实例链,可拦截流经通道的入站和出站事件,即可明白ChannelHandler之间的交互是如何构成应用程序数据和事件处理逻辑的核心的。当创建一个新的Channel时,都会分配了一个新的ChannelPipeline,该关联是永久的,该通道既不能附加另一个ChannelPipeline也不能分离当前的ChannelPipeline

    一个事件要么被ChannelInboundHander处理,要么被ChannelOutboundHandler处理,随后,它将通过调用ChannelHandlerContext的实现来将事件转发至同一超类型的下一个处理器。ChannelHandlerContext允许ChannelHandler与其ChannelPipeline和其他ChannelHandler进行交互,一个处理器可以通知ChannelPipeline中的下一个处理器,甚至可以修改器隶属于的ChannelPipeline

    下图展示了ChannelHandlerPipelineChannelInboundHandlerChannelOutboundHandler之间的关系

    可以看到ChannelPipeline是由一系列ChannelHandlers组成,其还提供了通过自身传播事件的方法,当进站事件触发时,其从ChannelPipeline的头部传递到尾部,而出站事件会从右边传递到左边。

    当管道传播事件时,其会确定下一个ChannelHandler的类型是否与移动方向匹配,若不匹配,则会跳过并寻找下一个,直至找到相匹配的ChannelHandler(一个处理器可以会同时实现ChannelInboundHandlerChannelOutboundHandler)。

    ChannelHandlerContext接口

    ChannelHandlerContext代表了ChannelHandlerChannelPipeline之间的关联,当ChannelHandler被添加至ChannelPipeline中时其被创建,ChannelHandlerContext的主要功能是管理相关ChannelHandler与同一ChannelPipeline中的其他ChannelHandler的交互。

    ChannelHandlerContext中存在很多方法,其中一些也存在于ChannelHandlerChannelPipeline中,但是差别很大。如果在ChannelHandler或者ChannelPipeline中调用该方法,它们将在整个管道中传播,而如果在ChannelHandlerContext中调用方法,那么会仅仅传递至下个能处理该事件的ChannelHandler

    问题:ChannelPipeline在哪里创建的呢?

    通过上图我们可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler

    前面已经知道了一个Channel的初始化的基本过程,下面再回顾一下
    下面的代码是AbstractChannel构造器

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
    }
    

    AbstractChannel有一个pipeline字段,在构造器中会初始化它为DefaultChannelPipeline的实例,这里的代码就印证了一点:每个Channel都有一个ChannelPipeline

    总结

    主要介绍ChannelPipeline和ChannelHandler基本概念以及在netty中的作用。
    有点类似于加工厂的流水线Channel
    ChannelPipeline相当于流水线的传送带
    ChannelHandler流水线上的每个步骤工人
    ChannelHandlerContext就是待加工的产品

    这条流水线有个特点是双向的

    参考文章

    Netty实战.pdf
    https://www.cnblogs.com/leesf456/p/6901189.html
    https://www.jianshu.com/p/33311b4cab30
    https://www.jianshu.com/u/fc9c660e9843

    最后

    如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。
    关注公众号【爱编码】,回复2019有相关资料哦。

    相关文章

      网友评论

          本文标题:【Netty】ChannelPipeline和ChannelHa

          本文链接:https://www.haomeiwen.com/subject/bdsefctx.html