美文网首页深入浅出Netty源码剖析
Netty源码-ChannelPipeline和ChannelH

Netty源码-ChannelPipeline和ChannelH

作者: persisting_ | 来源:发表于2019-05-27 23:07 被阅读7次

    1 概述

    在Netty事件模型中,在发生网络事件(如Read,Write,Connect)等事件后,是通过注册在Pipeline中的一个个Handler对事件进行处理的,这种采用多Handler对事件进行处理可以对事件的处理进行逻辑分层,比如在经典的编码、解码处理中,可以注册一个专门的Handler对报文进行编码或者解码,编码或者解码之后的报文再传递给下一个Handler进行处理。另外Netty采用这种Pipeline这种串行的Handler处理各种事件,避免了线程的上下文切换,减少了多线程环境对锁的依赖,也能在一定程度上提高性能。

    ChannelPipelineChannelHandler的容器,负责管理一系列的ChannelHandler,对到来的事件进行处理。

    ChannelHandler则是对事件处理的一个个处理器,分为两种类型,即ChannelInboundHandlerChannelOutboundHandler,分别负责处理Netty中的Inbound和Outbound事件,从两个接口中定义的函数可以知道Inbound和Outbound事件分别有哪些:

    ChannelInboundHandler接口函数定义.png ChannelOutboundHandler接口函数定义.png

    当然在``接口源码注释中也列出了Inbound和Outbound方法:

    Inbound event propagation methods:

    • ChannelHandlerContext.fireChannelRegistered()
    • ChannelHandlerContext.fireChannelActive()
    • ChannelHandlerContext.fireChannelRead(Object)
    • ChannelHandlerContext.fireChannelReadComplete()
    • ChannelHandlerContext.fireExceptionCaught(Throwable)
    • ChannelHandlerContext.fireUserEventTriggered(Object)
    • ChannelHandlerContext.fireChannelWritabilityChanged()
    • ChannelHandlerContext.fireChannelInactive()
    • ChannelHandlerContext.fireChannelUnregistered()

    Outbound event propagation methods:

    • ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
    • ChannelHandlerContext.connect(SocketAddress, SocketAddress, hannelPromise)
    • ChannelHandlerContext.write(Object, ChannelPromise)
    • ChannelHandlerContext.flush()
    • ChannelHandlerContext.read()
    • ChannelHandlerContext.disconnect(ChannelPromise)
    • ChannelHandlerContext.close(ChannelPromise)
    • ChannelHandlerContext.deregister(ChannelPromise)

    Pipeline中可以注册多个InboundHandler和多个OutboundHandler,并使用双向链表连接起来,对于收到的Inbound或Outbound事件会调用相关类型的Handler进行处理,但是Inbound和Outbound事件执行handler的顺序是不一样的,Inbound事件则是从前往后调用handler,最后一个被调用的是尾节点;对于Outbound事件则是从后往前调用,最后一个执行的是头结点。

    下面我们分别介绍Netty中ChannelPipelineChannelHandler的相关实现。

    2 ChannelPipeline

    2.1 接口(类)结构

    2.1.1 重要域

    在这里,我们直接看ChannelPipeline在Netty中的默认实现DefaultChannelPipeline。首先我们要说明一下,虽然说ChannelPipelineChannelHandler的容器,但是ChannelPipeline并不是直接持有ChannelHandler的,ChannelHandler会被封装成ChannelHandlerContextChannelPipeline则使用双链表持有一个个的ChannelHandlerContext

    我们先看DefaultChannelPipeline重要域:

    //DefaultChannelPipeline
    //该Pipeline关联的Channel,由构造函数传进来
    private final Channel channel;
    //pipeline持有一些列的context,但是其头和尾context是不可配置的,
    //在构造函数中被初始化
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    //这个平时没怎么使用,但是在后文介绍如何向pipeline中添加handler时
    //会介绍,pipeline默认使用channel注册的executor执行任务,但是也可以在
    //向pipeline中加入handler时传入EventExecutorGroup,然后从该线程组
    //选出线程执行任务,传入的线程组和第一次选出的线程executor会记录在
    //childExecutors,这样后面可以保证在向该pipeline添加handler时,如果
    //配置了SINGLE_EVENTEXECUTOR_PER_GROUP参数为true,即单线程执行任务,
    //childExecutors记录每个group第一次选出的executor则可以在下次添加
    //handler取出直接使用,保证单线,这个后面在介绍childExecutor方法时会
    //再次介绍
    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    

    关于上面提到的配置参数SINGLE_EVENTEXECUTOR_PER_GROUP,可见参考文章介绍如下:

    Netty参数,单线程执行ChannelPipeline中的事件,默认值为True。该值控制执行ChannelPipeline中执行ChannelHandler的线程。如果为True,整个pipeline由一个线程执行,这样不需要进行线程切换以及线程同步,是Netty4的推荐做法;如果为False,ChannelHandler中的处理过程会由Group中的不同线程执行。

    但是个人认为上面引用中的整个pipeline由一个线程执行不太准确,只能说如果传入同一个group,且配置为true,则可以保证由该group中的同一个线程处理,而不是整个pipeline由一个线程执行,这个后面介绍childExecutor再看。

    2.1.2 内部类

    DefaultChannelPipeline重要内部类有两个,也就是上面介绍到默认切不可更改的headtail节点,分别为HeadContextTailContext

    这里暂时不介绍这两个类的具体实现,我们在介绍ChannelHandler时再介绍。

    2.2 重要方法

    下面再看DefaultChannelPipeline重要方法,先介绍DefaultChannelPipeline的构造函数:

    //DefaultChannelPipeline
    protected DefaultChannelPipeline(Channel channel) {
        //记录该pipeline关联的channel
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        //初始化head和tail节点
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        //将首尾节点连接起来
        head.next = tail;
        tail.prev = head;
    }
    

    除了构造函数,我们将DefaultChannelPipeline剩下的方法分为三类,第一类是负责向pipeline中添加、删除或者替换handler,另一类负责触发Inbound handler,最后一类则负责触发Outbound handler:

    2.2.1 Handler添加、删除方法

    这类方法主要包含如下方法:

    向Pipeline中添加handler

    • addAfter
    • addBefore
    • addFirst
    • addLast

    从Pipeline中移除handler

    • remove
    • removeFirst
    • removeIfExists
    • removeLast

    替换Pipeline中的某个handler

    • replace

    为了节省篇幅,我们介绍一个方法的实现:addLast(EventExecutorGroup executor, ChannelHandler... handlers)

    //DefaultChannelPipeline
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
    
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            //将传入的Handler一个个添加到pipeline中
            addLast(executor, null, h);
        }
    
        return this;
    }
    
    
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            //上面介绍过pipeline中连接的实际上是封装了handler的
            //context
            newCtx = newContext(group, filterName(name, handler), handler);
    
            //将该context连接到链表尾端,但是在tail之前
            addLast0(newCtx);
    
            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
    
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    
    //新建一个新的DefaultChannelHandlerContext
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    
    //处理传入的线程组group
    private EventExecutor childExecutor(EventExecutorGroup group) {
        //如果没有传入线程组,则返回空
        if (group == null) {
            return null;
        }
        //获取SINGLE_EVENTEXECUTOR_PER_GROUP配置,
        //上面介绍过
        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        //如果不是采用单一线程执行,则调用next方法选出一个线程返回
        if (pinEventExecutor != null && !pinEventExecutor) {
            return group.next();
        }
    
        //如果执行到这里,表示传入了线程组,并且
        //SINGLE_EVENTEXECUTOR_PER_GROUP配置为Ture
    
    
        //获取记录的group和第一次从该线程组选出的线程
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        //如果为空,则新建一个IdentityHashMap,这里为什么使用
        //IdentityHashMap,可以看下IdentityHashMap的实现原理
        if (childExecutors == null) {
            // Use size of 4 as most people only use one extra EventExecutor.
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }
        // Pin one of the child executors once and remember it so that the same child executor
        // is used to fire events for the same channel.
        //找出group第一次调用时选出的线程
        EventExecutor childExecutor = childExecutors.get(group);
        //为空的话,则表示第一次使用该group,则从该group选出一个线程,
        //并放入该Map中
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }
    
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        //可见虽然是addLast,但是还是放在tail节点之前
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    
    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        //addFirst0也是同理,添加到列表最前面,但是在head节点之后
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }
    

    2.2.2 Inbound事件相关方法

    触发处理Inbound事件Handler的相关方法如下:

    //DefaultChannelPipeline    
     @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    
    @Override
    public final ChannelPipeline fireChannelInactive() {
        AbstractChannelHandlerContext.invokeChannelInactive(head);
        return this;
    }
    
    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }
    
    @Override
    public final ChannelPipeline fireUserEventTriggered(Object event) {
        AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
        return this;
    }
    
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    
    @Override
    public final ChannelPipeline fireChannelReadComplete() {
        AbstractChannelHandlerContext.invokeChannelReadComplete(head);
        return this;
    }
    
    @Override
    public final ChannelPipeline fireChannelWritabilityChanged() {
        AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
        return this;
    }
    

    从上面的方法定义可知,Inbound事件第一个被触发的Handler是head节点对应的handler,我们举例看下fireChannelRead方法的具体实现:

    //DefaultChannelPipeline  
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    
    //AbstractChannelHandlerContext
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        //获取该context对应的线程
        EventExecutor executor = next.executor();
        //如果当前线程就是该context的线程,则直接在该线程执行,否则
        //将该任务放入线程的任务队列中
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    
    
     @Override
    public EventExecutor executor() {
        //如果线程为空的话,默认返回channel注册的线程
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return executor;
        }
    }
    
    //获取context封装的handler,并执行相应的方法
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            //获取下一个Inbound handler并执行
            fireChannelRead(msg);
        }
    }
    
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    
    //对于Inbound事件,从当前context出发,从前往后找
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    

    除此之外,在一个handler处理完之后,想调用下一个handler继续处理,可以调用如下方法:

    //AbstractChannelHandlerContext
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        //调用findContextInbound从找到当前节点后面的第一个Inbound类型的
        //handler,并触发相应的函数
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    

    2.2.3 Outbound事件相关方法

    Outbond事件发生时,会触发Outbound类型的handler,流程和上面Inbound事件触发Inbound handler的流程类似,这里不再赘述,但是要注意的是,Outbound触发Outbound类型的handler是从后向前调用的,最后一个调用head。

    @Override
    public final ChannelFuture bind(SocketAddress localAddress) {
        return tail.bind(localAddress);
    }
    
    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }
    
    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return tail.connect(remoteAddress, localAddress);
    }
    
    @Override
    public final ChannelFuture disconnect() {
        return tail.disconnect();
    }
    
    @Override
    public final ChannelFuture close() {
        return tail.close();
    }
    
    ...
    

    这里大概列一下DefaultChannelPipeline.write方法的源码:

    //DefaultChannelPipeline
    @Override
    public final ChannelFuture write(Object msg, ChannelPromise promise) {
        return tail.write(msg, promise);
    }
    
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
    
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);
    
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        //从后往前找outbound handler
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
    
    private AbstractChannelHandlerContext findContextOutbound() {
        //从当前节点,往前找outbound handler
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
    
    

    3 几个重要的ChannelHandler

    相信通过上面的介绍,已经知道了ChannelHandler是怎么被调用执行相关方法的。下面我们介绍几个重要的ChannelHandler实现。

    • HeadContext

    HeadContext扩展了AbstractChannelHandlerContext,也实现了ChannelOutboundHandlerChannelInboundHandler,既是Inbound handler,也是outbound handler。

    其作为第一个被调用的Inbound handler,其Inbound相关方法没有做什么实际工作,仅仅触发下一个handler,如

    //HeadContext
     @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
    

    而作为最后一个被调用的outbound handler,其Outbound相关方法则进行实际的操作,如:

    //HeadContext
    //调用unsafe.flush实际向channel写数据
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }
    
    • TailContext
      TailContext扩展了AbstractChannelHandlerContext,并实现了ChannelInboundHandler接口,是一个Inbound handler,因为Inbound事件从前往后调用Inbound handler,所以TailContext是最后一个被调用的Inbound handler,这里我们尽看一个有意思的方法:
    //TailContext
     @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //如果channelRead事件能够成功被传递到tail节点,会执行此方法
        //作为DefaultChannelPipeline内部类,调用DefaultChannelPipeline
        //的onUnhandledInboundMessage方法
        onUnhandledInboundMessage(msg);
    }
    
    //DefaultChannelPipeline
    protected void onUnhandledInboundMessage(Object msg) {
        //记录日志,告诉用户msg被传到了tail节点,需要检查是否没有
        //在tail节点之前配置正确的inbound进行处理
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            //如下面源码所示,如果是ReferenceCounted类型,尝试进行
            //释放操作
            ReferenceCountUtil.release(msg);
        }
    }
    
    //ReferenceCountUtil
    public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            return ((ReferenceCounted) msg).release();
        }
        return false;
    }
    
    
    • ChannelInitializer
      ChannelInitializer主要用于channel初始化,一般用于在channelRegistered方法中向channel的pipeline中注册相关的handler,ChannelInitializer的特别之处是注册完handler之后,会将自己从pipeline的handler链表中删除,仅仅会被执行一次:
    //ChannelInitializer
     @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    //调用初始化函数
    if (initChannel(ctx)) {
        // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
        // miss an event.
        ctx.pipeline().fireChannelRegistered();
    } else {
        // Called initChannel(...) before which is the expected behavior, so just forward the event.
        ctx.fireChannelRegistered();
    }
    }
    
     @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                //调用提供给子类重写的初始化函数
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                //从pipeline的handler链表中移除自己
                remove(ctx);
            }
            return true;
        }
        return false;
    }
    
    //从pipeline的handler链表中移除自己
    private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }
    

    相关文章

      网友评论

        本文标题:Netty源码-ChannelPipeline和ChannelH

        本文链接:https://www.haomeiwen.com/subject/upnxtctx.html