美文网首页Java
Netty源码_DefaultChannelPipeline详解

Netty源码_DefaultChannelPipeline详解

作者: wo883721 | 来源:发表于2021-10-31 16:03 被阅读0次

    通过ChannelPipeline 这篇文章,ChannelPipeline 的主要功能就是

    它是一个事件处理器的管道,用户可以添加ChannelHandler 到这个管道上,然后它将拦截 IO 事件,交给它管理的ChannelHandler处理。

    因此要弄懂 ChannelPipeline 实现,只要从两方面入手:

    • 如何管理事件处理器ChannelHandler
    • 如何拦截 IO 事件?

    一. 管理 ChannelHandler

    通过AbstractChannelHandlerContext的分析,我们知道其实管道ChannelPipeline 是通过双向链表来存储任务处理器的上下文列表的。

    1.1 双向链表

        final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
    
        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    DefaultChannelPipeline 使用成员属性 head 表示双向链表头,tail 表示双向链表尾,它们在管道创建的时候,也被创建并赋值了。通过它们,就可以从头或者从尾遍历整个链表中的节点了。

    1.2 添加

    1.2.1 添加单个

        @Override
        public final ChannelPipeline addFirst(String n ame, ChannelHandler handler) {
            return addFirst(null, name, handler);
        }
    
        @Override
        public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            // 使用synchronized锁,防止并发问题
            synchronized (this) {
                // 检查这个 handler 有没有重复
                checkMultiplicity(handler);
                // 过滤name,如果当前管道中已经存在这个名字的 ChannelHandler,
                // 直接抛出异常
                name = filterName(name, handler);
    
                // 创建这个事件处理器ChannelHandler对应的上下文对象
                newCtx = newContext(group, name, handler);
    
                // 将这个上下文对象添加到管道中
                // 因为是用双向链表存储的,所以就是改变链表节点的 next 和 prev指向
                addFirst0(newCtx);
    
                // 如果 registered == false,则表示该通道还没有在eventLoop上注册。
                // 因此我们将上下文的状态设置成正在添加状态,
                // 并添加一个任务,该任务将在通道注册后就会调用 ChannelHandler.handlerAdded(...)。
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    // 如果当前线程不是新创建上下文的执行器线程,
                   // 那么使用executor.execute 方法,保证在执行器线程调用 callHandlerAdded0 方法
                    callHandlerAddedInEventLoop(newCtx, executor);
                    return this;
                }
            }
            // 将新创建上下文状态变成已添加,且调用 ChannelHandler.handlerAdded(...)方法
            callHandlerAdded0(newCtx);
            return this;
        }
    

    方法流程分析:

    1. 因为添加 ChannelHandler 会在任何线程调用,所以使用 synchronized 锁来防止并发修改问题。
    2. 通过 checkMultiplicity(handler) 方法检查这个 handler 有没有重复。
    3. 通过 filterName(name, handler) 方法判断这个名字在当前管道中是否重复。
    4. 通过 newContext(group, name, handler) 创建这个事件处理器ChannelHandler对应的上下文对象。
    5. 通过 addFirst0(newCtx) 方法将新建上下文对象添加到管道中。
    6. 下面就是关于新建上下文状态和ChannelHandler.handlerAdded(...) 方法调用问题的处理,分为几种情况:
      • 当管道对应的通道还没有注册到 eventLoop, 那么handlerAdded(...) 现在还不能被调用;那就先将上下文状态变成 ADD_PENDING,并添加一个任务,保证当通道添加之后,再将状态变成 ADD_COMPLETE 且调用handlerAdded(...) 方法。
      • 当前线程不是新创建上下文执行器线程,那么也是先将上下文状态变成 ADD_PENDING, 并在上下文执行器线程中调用 callHandlerAdded0 方法。
      • 不是上面情况,直接调用callHandlerAdded0 方法,将状态变成 ADD_COMPLETE 且调用handlerAdded(...) 方法。

    1.2.2 添加多个

        @Override
        public final ChannelPipeline addFirst(ChannelHandler... handlers) {
            return addFirst(null, handlers);
        }
    
        @Override
        public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
            ObjectUtil.checkNotNull(handlers, "handlers");
            // 如果 handlers 为空,那么不用添加,直接返回
            if (handlers.length == 0 || handlers[0] == null) {
                return this;
            }
    
            int size;
            for (size = 1; size < handlers.length; size ++) {
                // 如果有 ChannelHandler 是 null,那么后面的就不用添加了
                if (handlers[size] == null) {
                    break;
                }
            }
    
            // 倒序遍历,保证添加的顺序
            for (int i = size - 1; i >= 0; i --) {
                ChannelHandler h = handlers[i];
                // 直接调用 addFirst 方法添加
                addFirst(executor, null, h);
            }
    
            return this;
        }
    

    通过倒序循环遍历,调用单个添加方法,来实现一次添加多个。

    1.2.3 辅助方法

    1. checkMultiplicity(...)

       private static void checkMultiplicity(ChannelHandler handler) {
           if (handler instanceof ChannelHandlerAdapter) {
               // 如果是 ChannelHandlerAdapter 的子类
               ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
               // 只有 ChannelHandler 是可共享的,才能多次添加,
               // 否则 handler 已被添加(h.added == true) 的情况下,直接抛出异常
               if (!h.isSharable() && h.added) {
                   throw new ChannelPipelineException(
                           h.getClass().getName() +
                           " is not a @Sharable handler, so can't be added or removed multiple times.");
               }
               h.added = true;
           }
       }
      

      检查 ChannelHandler 是否重复
      只有被 @Sharable 注解的ChannelHandler 才可以多次添加到一个或多个管道 ChannelPipeline

    2. filterName(...)

       private String filterName(String name, ChannelHandler handler) {
           if (name == null) {
               // 通过 ChannelHandler 生成一个名字
               return generateName(handler);
           }
           // 检查是否重复
           checkDuplicateName(name);
           return name;
       }
      
       private void checkDuplicateName(String name) {
           // 通过 context0(name) 方法,看这个名字能否在管道中找到对应的上下文
           if (context0(name) != null) {
               throw new IllegalArgumentException("Duplicate handler name: " + name);
           }
       }
      
    3. newContext(...)

        private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
           return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
       }
      
        private EventExecutor childExecutor(EventExecutorGroup group) {
           if (group == null) {
               return null;
           }
           Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
           if (pinEventExecutor != null && !pinEventExecutor) {
               return group.next();
           }
           Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
           if (childExecutors == null) {
               // 使用4的大小,因为大多数情况下只使用一个额外的 EventExecutor。
               childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
           }
           // 虽然一个事件处理器组 EventExecutorGroup 中管理有多个子事件处理器 EventExecutor,
           // 但是对于同一个通道 Channel 应该使用同一个子事件处理器 EventExecutor,
           // 以便使用相同的子执行器触发相同通道的事件。
           EventExecutor childExecutor = childExecutors.get(group);
           // 如果 childExecutor 不为null,直接返回 childExecutor,使用同个子事件处理器
           if (childExecutor == null) {
               // 没有,则从事件处理器组中获取一个子事件处理器。
               childExecutor = group.next();
               // 记录它,保证同一个管道是同一个子事件处理器。
               childExecutors.put(group, childExecutor);
           }
           return childExecutor;
       }
      
      • 直接创建事件处理器ChannelHandler 对应的上下文。
      • childExecutor(group) 保证同一个管道添加的子事件执行器 EventExecutor 是同一个。
    4. addFirst0(...)

       private void addFirst0(AbstractChannelHandlerContext newCtx) {
           AbstractChannelHandlerContext nextCtx = head.next;
           newCtx.prev = head;
           newCtx.next = nextCtx;
           head.next = newCtx;
           nextCtx.prev = newCtx;
       }
      

      因为是双向链表,所以插入一个节点,是要改变当前插入位置前节点的 next 和后节点的 prev,以及这个插入节点的nextprev

    1.3 删除

        @Override
        public final ChannelPipeline remove(ChannelHandler handler) {
            // 通过 getContextOrDie 得到对应的上下文
            remove(getContextOrDie(handler));
            return this;
        }
    
        @Override
        public final ChannelHandler remove(String name) {
            // 通过 getContextOrDie 得到对应的上下文
            return remove(getContextOrDie(name)).handler();
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
            // 通过 getContextOrDie 得到对应的上下文
            return (T) remove(getContextOrDie(handlerType)).handler();
        }
    
        private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
            // head 和 tail 比较特殊,不是用户定义的,
            // 所以它也不能被删除
            assert ctx != head && ctx != tail;
    
            // 使用synchronized锁,防止并发问题
            synchronized (this) {
                // 从管道中删除这个上下文 ctx
                atomicRemoveFromHandlerList(ctx);
    
                // 如果 registered == false,则表示该通道还没有在eventLoop上注册。
                // 在本例中我们从管道中移除上下文,
                // 并添加一个任务,该任务将在通道注册后调用 ChannelHandler.handlerRemoved(...)。
                if (!registered) {
                    callHandlerCallbackLater(ctx, false);
                    return ctx;
                }
    
                EventExecutor executor = ctx.executor();
                if (!executor.inEventLoop()) {
                    // 如果当前线程不是新创建上下文的执行器线程,
                    // 那么使用executor.execute 方法,保证在执行器线程调用 callHandlerRemoved0 方法
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerRemoved0(ctx);
                        }
                    });
                    return ctx;
                }
            }
            // 将新创建上下文状态变成 REMOVE_COMPLETE,
            // 且调用 ChannelHandler.handlerRemoved(...)方法
            callHandlerRemoved0(ctx);
            return ctx;
        }
    
    • 删除的方法流程比添加简单,因为删除不需要进行重复性判断,以及创建上下文对象。
    • 只需要从管道中得到要删除的上下文对象,通过atomicRemoveFromHandlerList 方法从管道中删除。
    • 然后在修改上下文状态,并调用 ChannelHandler.handlerRemoved(...)
    private AbstractChannelHandlerContext getContextOrDie(String name) {
            // 如果 context() 方法对应的上下文
            AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
            if (ctx == null) {
                // 如果为空,抛出异常
                throw new NoSuchElementException(name);
            } else {
                return ctx;
            }
        }
    
        private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
            // 如果 context() 方法对应的上下文
            AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
            if (ctx == null) {
                // 如果为空,抛出异常
                throw new NoSuchElementException(handler.getClass().getName());
            } else {
                return ctx;
            }
        }
    
        private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
            // 如果 context() 方法对应的上下文
            AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
            if (ctx == null) {
                // 如果为空,抛出异常
                throw new NoSuchElementException(handlerType.getName());
            } else {
                return ctx;
            }
        }
    

    1.4 替换

        @Override
        public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
            replace(getContextOrDie(oldHandler), newName, newHandler);
            return this;
        }
    
        @Override
        public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
            return replace(getContextOrDie(oldName), newName, newHandler);
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public final <T extends ChannelHandler> T replace(
                Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
            return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
        }
    
        private ChannelHandler replace(
                final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
            // head 和 tail 比较特殊,不是用户定义的,
            // 所以它也不能被替换,不应该出现在这里
            assert ctx != head && ctx != tail;
    
            final AbstractChannelHandlerContext newCtx;
            // 使用synchronized锁,防止并发问题
            synchronized (this) {
                // 检查新的事件处理器 newHandler 重复性
                checkMultiplicity(newHandler);
                if (newName == null) {
                    newName = generateName(newHandler);
                } else {
                    boolean sameName = ctx.name().equals(newName);
                    if (!sameName) {
                        // 如果新名字 newName 和老名字不同,
                        // 要检查新名字的重复性
                        checkDuplicateName(newName);
                    }
                }
    
                // 创建新事件处理器 newHandler 对应的上下文
                newCtx = newContext(ctx.executor, newName, newHandler);
    
                // 在管道中,用新的上下文替换老的上下文
                replace0(ctx, newCtx);
    
                /**
                 * 下面就是改变新老上下文的状态:
                 * 要将新的上下文状态变成 ADD_COMPLETE
                 * 要将老的上下文状态变成 REMOVE_COMPLETE
                 *
                 * 而且必须先将新的上下文变成 ADD_COMPLETE,
                 * 因为改变老的上下文状态时,有可能会触发新处理器的 channelRead() 或 flush() 方法
                 */
    
                // 如果 registered == false,则表示该通道还没有在eventLoop上注册。
                if (!registered) {
                    callHandlerCallbackLater(newCtx, true);
                    callHandlerCallbackLater(ctx, false);
                    return ctx.handler();
                }
                EventExecutor executor = ctx.executor();
                if (!executor.inEventLoop()) {
                    // 如果当前线程不是新创建上下文的执行器线程,
                    // 要切换到执行器线程执行
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
    
                            callHandlerAdded0(newCtx);
                            callHandlerRemoved0(ctx);
                        }
                    });
                    return ctx.handler();
                }
            }
    
            callHandlerAdded0(newCtx);
            callHandlerRemoved0(ctx);
            return ctx.handler();
        }
    

    替换操作的方法流程比添加复杂一点:

    • 检查新事件处理器的重复性和新名称的重复性
    • 创建新事件处理器 newHandler 对应的上下文
    • 通过 replace0(ctx, newCtx) 方法,用新的上下文替换老的上下文。
    • 最后改变改变新老上下文的状态,而且必须先将新的上下文状态变成 ADD_COMPLETE, 然后再将老的上下文状态变成 REMOVE_COMPLETE

    1.5 查找

        @Override
        public final ChannelHandlerContext context(ChannelHandler handler) {
            ObjectUtil.checkNotNull(handler, "handler");
    
            AbstractChannelHandlerContext ctx = head.next;
            // 从链表头开始遍历
            for (;;) {
                // 遍历完了,那就返回 null
                if (ctx == null) {
                    return null;
                }
                // 找到了,就返回这个上下文
                if (ctx.handler() == handler) {
                    return ctx;
                }
                // 下一个上下文对象
                ctx = ctx.next;
            }
        }
    
        @Override
        public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
            ObjectUtil.checkNotNull(handlerType, "handlerType");
    
            AbstractChannelHandlerContext ctx = head.next;
            // 从链表头开始遍历
            for (;;) {
                // 遍历完了,那就返回 null
                if (ctx == null) {
                    return null;
                }
                // 找到了,就返回这个上下文
                if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
                    return ctx;
                }
                // 下一个上下文对象
                ctx = ctx.next;
            }
        }
    
        @Override
        public final ChannelHandlerContext context(String name) {
            return context0(ObjectUtil.checkNotNull(name, "name"));
        }
    
        private AbstractChannelHandlerContext context0(String name) {
            AbstractChannelHandlerContext context = head.next;
            // 是不是遍历到链表尾部了
            while (context != tail) {
                // 找到了就返回
                if (context.name().equals(name)) {
                    return context;
                }
                // 链表中的下一个
                context = context.next;
            }
            return null;
        }
    

    都是从双向链表头开始遍历,找到了就返回这个上下文,如果遍历到最后都没有找到,就返回 null

    二. 拦截 IO 事件

    2.1 拦截入站 IO 事件

        @Override
        public final ChannelPipeline fireChannelRegistered() {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelRegistered(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelUnregistered() {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelUnregistered(head);
            return this;
        }
     @Override
        public final ChannelPipeline fireChannelActive() {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelInactive() {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelInactive(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireExceptionCaught(Throwable cause) {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
            return this;
        }
    
     @Override
        public final ChannelPipeline fireUserEventTriggered(Object event) {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelReadComplete() {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelReadComplete(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelWritabilityChanged() {
            // 从链表头节点 head 开始处理 入站IO事件
            AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
            return this;
        }
    

    从链表头节点 head 开始处理 入站IO事件

    2.2 拦截出站 IO 操作

       @Override
      public final ChannelFuture bind(SocketAddress localAddress) {
            // 从链表尾节点 tail 开始处理 出站`IO`操作
            return tail.bind(localAddress);
        }
    
        @Override
        public final ChannelFuture connect(SocketAddress remoteAddress) {
            // 从链表尾节点 tail 开始处理 出站`IO`操作
            return tail.connect(remoteAddress);
        }
    
        @Override
        public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
            // 从链表尾节点 tail 开始处理 出站`IO`操作
            return tail.connect(remoteAddress, localAddress);
        }
    
        @Override
        public final ChannelFuture disconnect() {
            // 从链表尾节点 tail 开始处理 出站`IO`操作
            return tail.disconnect();
        }
       ........
    

    从链表尾节点tail开始处理 出站 IO 操作

    三. HeadContext 类

       final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            // 通道 Channel 的Unsafe
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, HeadContext.class);
                unsafe = pipeline.channel().unsafe();
                // 创建时就将自己状态变成 ADD_COMPLETE
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                // 事件处理器就是它自己
                return this;
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) {
                // NOOP
            }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) {
                // NOOP
            }
    
            @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.disconnect(promise);
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.close(promise);
            }
    
            @Override
            public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.deregister(promise);
            }
    
            @Override
            public void read(ChannelHandlerContext ctx) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.beginRead();
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) {
                // 出站 IO 操作都是调用 unsafe 对应方法
                unsafe.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireExceptionCaught(cause);
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) {
                // 调用那些在未注册之前就添加的 ChannelHandler 的回调函数
                invokeHandlerAddedIfNeeded();
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelRegistered();
            }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelUnregistered();
    
                // 如果通道关闭且未注册,则依次删除所有处理程序。
                if (!channel.isOpen()) {
                    destroy();
                }
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelActive();
    
                // 是否需要主动触发读操作
                readIfIsAutoRead();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelInactive();
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelRead(msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelReadComplete();
    
                // 是否需要主动触发读操作
                readIfIsAutoRead();
            }
    
            private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {
                    // 如果 isAutoRead 是true,就主动触发读操作
                    channel.read();
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireUserEventTriggered(evt);
            }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) {
                // 入站 IO 事件就交给 下一个入站事件处理器处理
                ctx.fireChannelWritabilityChanged();
            }
        }
    

    这个类非常重要,它代表链表头的节点。

    1. 它继承 AbstractChannelHandlerContext 类,又实现了 ChannelOutboundHandlerChannelInboundHandler 接口

      • 说明它即是一个上下文对象,又是一个事件处理器,该上下文对应的事件处理器就是它自己。
      • 它既可以处理入站事件,又可以处理出站事件。
    2. 处理 IO 事件

      • 因为管道中 IO 事件的流向是,入站事件是从头到尾,出站事件是从尾到头。
      • 对于入站事件,就直接调用 ChannelHandlerContext 对应方法,将入站事件交给 下一个入站事件处理器处理。
      • 对于出站事件,因为已经是链表头,是最后处理出站事件的地方,所以调用 Unsafe 的对应方法处理。也就是说所有出站事件的最后归宿应该都是调用 Unsafe 方法处理。

    四. HeadContext 类

        final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
            TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, TAIL_NAME, TailContext.class);
                // 创建时就将自己状态变成 ADD_COMPLETE
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                // 事件处理器就是它自己
                return this;
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) { }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) { }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelActive();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelInactive();
            }
    
            @Override
            public void channelWritabilityChanged(ChannelHandlerContext ctx) {
                onUnhandledChannelWritabilityChanged();
            }
    
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) { }
    
            @Override
            public void handlerRemoved(ChannelHandlerContext ctx) { }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                onUnhandledInboundUserEventTriggered(evt);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                onUnhandledInboundException(cause);
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                onUnhandledInboundMessage(ctx, msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelReadComplete();
            }
        }
    
    • TailContext继承 AbstractChannelHandlerContext 类,又实现了ChannelInboundHandler 接口, 表明它即是一个上下文对象,又是一个入站事件处理器,该上下文对应的事件处理器就是它自己。

    TailContext的作用很简单,它是最后处理入站事件的地方

    也就是说入站事件在链表中,没有任何入站处理器处理,那么 TailContext 就要做一些收尾的处理。比如说捕获异常,一些资源的回收等等。

    相关文章

      网友评论

        本文标题:Netty源码_DefaultChannelPipeline详解

        本文链接:https://www.haomeiwen.com/subject/ymloaltx.html