美文网首页Netty 权威指南笔记专题收藏
Netty 权威指南笔记(七):ChannelPipeline

Netty 权威指南笔记(七):ChannelPipeline

作者: albon | 来源:发表于2017-11-14 09:50 被阅读104次

    Netty 权威指南笔记(七):ChannelPipeline 和 ChannelHandler 源码分析

    文中源码版本为 Netty4.1。

    概述

    Netty 的 ChannelPipeline 和 ChannelHandler 机制类似于 Servlet 和 Filter 过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的定制。

    Servlet Filter 过滤器提供了一种面向对象的模块化机制,用来将公共人物封装到可插入的组件中。这些组件通过 Web 部署配置文件(web.xml)进行声明,无须改动代码即可添加和删除过滤器。可以对应于程序 Servlet 提供的核心功能进行补充,而不破坏原有的功能。

    Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 来对 I/O 事件进行具体的拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同业务逻辑的定制,能够实现对修改封闭和对扩展到支持。

    ChannelPipeline 源码分析

    ChannelHandler 双向链表

    TimeServer 程序 中,调用了 ChannelPipeline 的 addLast 方法来添加 ChannelHandler。那么 ChannelHandler 在其中是如何存储的呢?

                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 2014)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(16));
                                socketChannel.pipeline().addLast(new TimeServerHandler());
                            }
                        });
    

    我们看一下 ChannelPipeline 的成员变量,前两个就是 ChannelHandler 链表的首尾引用,其类型是 AbstractChannelHandlerContext,该类主要包含一个双向链表节点的前置和后置节点引用 prev、next,以及数据引用 handler,相当于链表数据结构中的 Node 节点。

        // ChannelHandler 首位指针
        final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
        // pipeline 所属 channel
        private final Channel channel;
        private final ChannelFuture succeededFuture;
        private final VoidChannelPromise voidPromise;
        private final boolean touch = ResourceLeakDetector.isEnabled();
    
        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    AbstractChannelHandlerContext 类主要成员变量:

        // in AbstractChannelHandlerContext 抽象类
        volatile AbstractChannelHandlerContext next;
        volatile AbstractChannelHandlerContext prev;
        // DefaultChannelHandlerContext 实现类
        private final ChannelHandler handler;
    

    TimeServer 程序中调用的 addLast 方法源码如下:

    1. 首先进行了能否被共享的检查。
    2. 然后构建了 AbstractChannelHandlerContext 节点,并加入到了链表尾部。
    3. 如果 channel 尚未注册到 EventLoop,就添加一个任务到 PendingHandlerCallback 上,后续注册完毕,再调用 ChannelHandler.handlerAdded。
    4. 如果已经注册,马上调用 callHandlerAdded0 方法来执行 ChannelHandler.handlerAdded。
        @Override
        public final ChannelPipeline addLast(String name, ChannelHandler handler) {
            return addLast(null, name, handler);
        }
    
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                // 检查,若不是 Sharable,而且已经被添加到其他 pipeline,则抛出异常
                checkMultiplicity(handler);
                // 构建 AbstractChannelHandlerContext 节点
                newCtx = newContext(group, filterName(name, handler), handler);
                // 添加到链表尾部
                addLast0(newCtx);
    
                // registered 为 false 表示 channel 尚未注册到 EventLoop 上。
                // 添加一个任务到 PendingHandlerCallback 上,后续注册完毕,再调用 ChannelHandler.handlerAdded
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                // registered 为 true,则立即调用 ChannelHandler.handlerAdded
                EventExecutor executor = newCtx.executor();
                // inEvent 用于判断当前线程是否是 EventLoop 线程。执行 ChannelHandler 时,必须在对应的 EventLoop 线程池中执行。
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }
        // 使用 AbstractChannelHandlerContext 包裹 ChannelHandler
        private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
        }    
        // 将新节点插入链表尾部
        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    

    事件处理流程

    Netty 中的事件分为 inbound 事件和 outbound 事件。inbound 事件通常由 I/O 线程触发,比如:

    1. 注册事件 fireChannelRegistered。
    2. 连接建立事件 fireChannelActive。
    3. 读事件和读完成事件 fireChannelRead、fireChannelReadComplete。
    4. 异常通知事件 fireExceptionCaught。
    5. 用户自定义事件 fireUserEventTriggered。
    6. Channel 可写状态变化事件 fireChannelWritabilityChanged。
    7. 连接关闭事件 fireChannelInactive。

    outbound 事件通常是由用户主动出发的 I/O 事件,比如:

    1. 端口绑定 bind。
    2. 连接服务端 connect。
    3. 写事件 write。
    4. 刷新时间 flush。
    5. 读事件 read。
    6. 主动断开连接 disconnect。
    7. 关闭 channel 事件 close。
        public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
        @Override
        public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
            return tail.bind(localAddress, promise);
        }
    
        @Override
        public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return tail.connect(remoteAddress, promise);
        }    
    

    看代码我们发现,inbound 事件是从 HeadContext 开始处理的,而 outbound 事件都是由 TailContext 首先处理的。其中的原因是,HeadContext 负责与 NIO 底层的 SocketChannel、ServerSocketChannel 进行交互(通过 Unsafe 类)。

        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, false, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }        
    
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                    throws Exception {
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                unsafe.write(msg, promise);
            }                
        }    
    

    下面我们以读事件 fireChannelRead 为例看一下其处理流程,在 DefaultChannelPipeline 中调用了 AbstractChannelHandlerContext 类的 invokeChannelRead 方法,其源码如下:

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            // 如果 msg 实现了 ReferenceCounted 接口,进行处理。
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            // 调用 invokeChannelRead 方法
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
    
        private void invokeChannelRead(Object msg) {
            // 先调用 channelRead 方法,再 fireChannelRead 触发下一个节点的 channelRead 方法
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
    
        // inbound 事件中的下一个节点是本节点 next 引用所指节点,而 outbound 事件相反。
        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }        
    

    ChannelHandler 源码分析

    ChannelHandler 类似于 Servlet 的 Filter 过滤器,负责对 I/O 事件进行拦截和处理,可以选择性地处理,也可以透传和终止事件的传递。基于 ChannelHandler 接口,用户可以方便地进行业务逻辑定制,比如打印日志,统一封装异常信息等。

    类图

    ChannelHandler 类图如下所示:

    Netty ChannelHandler 类图

    前面提到 Netty 事件分为 inbound 和 outbound 两类,分别对应 ChannelInboundHandler 和 ChannelOutboundHandler,它们的公共父类就是 ChannelHandler。

    ChannelHandler、ChannelInboundHandler 和 ChannelOutboundHandler 接口中,提供了许多方法。在实际使用中,用户往往只需要其中的一两个。为了方便用户使用,有几个抽象类(ChannelHandlerAdapter、ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter)提供了一些默认实现,如此用户只需要实现自己关心的方法便可。

    类图倒数第二层提供了一些编解码器的抽象类,用户可以在此基础上进行扩展。最后一层是几种常见的编解码器。

    编解码器 类型 功能
    MessageToMessageEncoder outbound 从一个对象到另一个对象的转换
    MessageToByteEncoder outbound 从对象到 ByteBuf 的转换
    LengthFieldPrepender outbound 在消息体前面追加消息长度的编码器
    ProtobufVarint32LengthFieldPrepender outbound 给 protobuf 字节流添加描述消息长度的消息头
    MessageToMessageDecoder inbound 从对象到对象的解码器
    ByteToMessageDecoder inbound 从 ByteBuf 到对象的解码器
    StringDecoder inbound 将 ByteBuf 转化成指定编码的 String
    FixedLengthFrameDecoder inbound 定长消息解码器
    LengthFieldBasedFrameDecoder inbound 消息长度在位于消息头的解码器

    下面我们选择几个典型类来解读其源码。

    ChannelHandler

    ChannelHandler 只有少数几个方法,用于处理 ChannelHandler 被添加时做一些初始化操作,被移除时做一些销毁操作,以及异常处理。

    除此之外,还有一个注解 Sharable,用于标识一个 ChannelHandler 实例可以被多个 ChannelPipeline 共享。

    public interface ChannelHandler {
        void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    
        void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    
        @Deprecated
        void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    
        @Inherited
        @Documented
        @Target(ElementType.TYPE)
        @Retention(RetentionPolicy.RUNTIME)
        @interface Sharable {
            // no value
        }
    }
    

    ChannelInboundHandler 和 ChannelOutboundHandler 接口中的方法和 “事件处理流程” 那一节中介绍的 inbound 和 outbound 事件类型基本上可以一一对应,这里就不贴代码分析了。

    ChannelHandlerAdapter

    ChannelHandlerAdapter 抽象类,提供了 ChannelHandler 接口方法的默认实现,以及根据注解判断该类是否可共享的 isSharable 方法。

    handlerAdded 和 handlerRemoved 的默认实现都是空。

    public abstract class ChannelHandlerAdapter implements ChannelHandler {
    
        /**
         * Return {@code true} if the implementation is {@link Sharable} and so can be added
         * to different {@link ChannelPipeline}s.
         */
        public boolean isSharable() {
            /**
             * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
             * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
             * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
             * {@link Thread}s are quite limited anyway.
             *
             * See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
             */
            Class<?> clazz = getClass();
            Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
            Boolean sharable = cache.get(clazz);
            if (sharable == null) {
                sharable = clazz.isAnnotationPresent(Sharable.class);
                cache.put(clazz, sharable);
            }
            return sharable;
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            // NOOP
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            // NOOP
        }
    }
    

    ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter 分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的默认实现。以 ChannelInboundHandlerAdapter 为例,其大多数方法的默认实现都是调用 ChannelHandlerContext 的类似方法,作用为向后传递。

    public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelRegistered();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
    }    
    

    ByteToMessageDecoder

    该方法提供了将 ByteBuf 转化为对象的解码器处理流程,具体的解码规则交由子类去实现。

    我们以读操作 channelRead 为例来研究一下:

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                // out 是一个链表,存放解码成功的对象
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    // cumulation 中存放的是上次未处理完的半包消息
                    first = cumulation == null;
                    if (first) {
                        cumulation = data;
                    } else {
                        // 本次处理,需要把上次遗留的半包和本次数据拼接后,一起处理
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
                    // 调用解码器解码消息
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Exception e) {
                    throw new DecoderException(e);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    
                    int size = out.size();
                    decodeWasNull = !out.insertSinceRecycled();
                    // 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
                    fireChannelRead(ctx, out, size);
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) {
                    int outSize = out.size();
    
                    if (outSize > 0) {
                        // 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
                        // 如果当前 ChannelHandler 所属 ctx 被剔除 pipeline 上下文,就不需要继续处理了
                        if (ctx.isRemoved()) {
                            break;
                        }
                        outSize = 0;
                    }
    
                    int oldInputLength = in.readableBytes();
                    // 解码
                    decodeRemovalReentryProtection(ctx, in, out);
    
                    if (ctx.isRemoved()) {
                        break;
                    }
    
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }
    
                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                        ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Exception cause) {
                throw new DecoderException(cause);
            }
        }
    
        final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                throws Exception {
            // 设置解码器状态为正在解码,避免解码过程中另一个线程调用了 handlerRemoved 把数据销毁,造成混乱
            decodeState = STATE_CALLING_CHILD_DECODE;
            try {
                decode(ctx, in, out);
            } finally {
                // STATE_HANDLER_REMOVED_PENDING 表示在解码过程中,ctx 被移除,需要由当前线程来调用 handlerRemoved 
                boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
                decodeState = STATE_INIT;
                if (removePending) {
                    handlerRemoved(ctx);
                }
            }
        }
    
        // 具体的消息解码算法,交给子类实现
        protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;               
    

    FixedLengthFrameDecoder

    上一节我们研究了 ByteToMessageDecoder,本节研究其最简单的一个实现类 FixedLengthFrameDecoder。

    该类核心是 decode 方法,当可读字节数据大于 frameLength 时,截取前 frameLength 个字节为一个 ByteBuf,存入列表 out 中。

    public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    
        private final int frameLength;
    
        public FixedLengthFrameDecoder(int frameLength) {
            if (frameLength <= 0) {
                throw new IllegalArgumentException(
                        "frameLength must be a positive integer: " + frameLength);
            }
            this.frameLength = frameLength;
        }
    
        @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Object decoded = decode(ctx, in);
            if (decoded != null) {
                out.add(decoded);
            }
        }
    
        protected Object decode(
                @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if (in.readableBytes() < frameLength) {
                return null;
            } else {
                return in.readRetainedSlice(frameLength);
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:Netty 权威指南笔记(七):ChannelPipeline

        本文链接:https://www.haomeiwen.com/subject/pqjxvxtx.html