美文网首页Netty学习
netty源码分析 - ChannelHandler

netty源码分析 - ChannelHandler

作者: 晴天哥_王志 | 来源:发表于2020-03-26 09:18 被阅读0次

开篇

  • 本文基于netty-4.1.8.Final版本进行分析,主要是分析Netty Server初始化过程。
  • 建议先参考Netty源码分析 - Bootstrap客户端文章。
  • 核心点在于能够理解pipeline的串行调用的执行过程。

基本概念

pipeline
  • channel、pipeline、context、handler的关系图如上所示,handler由context进行封装,由双向链表的数据结构进行连接。

  • 当Channel对象在构造的时候会同时创建一个ChannelPipeline对象,两个对象相互关联,是一对一的关系,ChannelPipeline不会被多个Channel共享。

  • ChannelPipeline对象创建之后会调用它的各种添加handler的方法向链中加入ChannelHandler对象,而在加入ChannelHandler对象的同时,会自动给每个ChannelHandler包装一个ChannelHandlerContext对象。

  • ChannelHandlerContext是ChannelHandler的上下文信息,它使得ChannelHandler可以和ChannelPipeline以及其它的ChannelHandler对象进行交互操作。通过ChannelHandlerContext对象,ChannelHandler可以通知同一个pipeline中的其他ChannelHandler,也可以在运行时动态改变ChannelPipeline中的内容。

输入消息处理

  • 当输入消息触发的时候,例如registred,active,read或readComplete等输入的消息触发的时候,会通过Channel调用对应的ChannelPipeline的对应方法来处理,输入消息会首先通过head找到下一个ChanneInbountHandler来处理输入消息,然后逐一传递到下一个ChanneInbountHandler消息,直至到最后一个内置的tail处理器。

输出消息处理

  • 当输入消息触发的时候,例如bind,connect,write等输出消息触发的时候,会通过Channel调用对应的ChannelPipeline的对应方法来处理,输入消息会首先通过tail找到下一个ChanneOutbountHandler来处理输入消息,然后逐一传递到下一个ChanneOutbountHandler消息,直至到最后一个内置的head处理器。

pipeline的handler添加过程

  • pipeline的hanndler添加过程分为两个阶段,第一个阶段为添加ChannelInitializer对应的handler,在Netty Client/Server在初始化channel的时候会执行;第二阶段为执行ChannelInitializer对象内部方法initChannel()的时候添加对应handler,在注册channel的时候会执行。

DiscardClient例子

public final class DiscardClient {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
              // 绑定handler对象
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     p.addLast(new DiscardClientHandler());
                 }
             });

            // Make the connection attempt.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

handler添加过程

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1. newChannel创建对应的channel对象
            channel = channelFactory.newChannel();
            // 2. init真正执行的是Bootstrap的init()方法
            init(channel);
        } catch (Throwable t) {
          // 省略代码
        }
        // 3. 执行register()动作
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }
}
  • AbstractBootstrap#initAndRegister执行操作三部曲:创建channel,初始化channel,注册channel。
  • channelFactory.newChannel()创建channel。
  • init(channel)初始化channel,init(channel)在子类Bootstrap被重写。
  • config().group().register(channel)注册channel。

init过程

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        // 绑定ChannelInitializer到ChannelPipeline对象当中
        p.addLast(config.handler());

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
}
  • Bootstrap#init的核心操作在于将Bootstrap的handler添加到channel对应的pipeline当中。
  • config.handler()返回的handler是DiscardClient中handler(new ChannelInitializer<SocketChannel>())绑定的handler对象。
  • 划重点,在这里将ChannelHandler对象ChannelInitializer添加对应的pipeline当中
public class DefaultChannelPipeline implements ChannelPipeline {

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            // 1.channelHandler包装成DefaultChannelHandlerContext对象
            newCtx = newContext(group, filterName(name, handler), handler);
            // 2.添加DefaultChannelHandlerContext对象到pipeline当中。
            addLast0(newCtx);
            // 3.针对未注册的逻辑添加回调函数callHandlerCallbackLater
            if (!registered) {
                newCtx.setAddPending();
                // 添加回调函数callHandlerCallbackLater
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

        // 省略代码
        return this;
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
}
  • ChannelHandler添加到pipeline流程包含3个步骤:创建HandlerContext对象;添加HandlerContext对象到pipeline当中;注册callHandlerCallbackLater的回调task。
  • 创建HandlerContext对象,newContext(group, filterName(name, handler), handler)。
  • 添加HandlerContext对象到pipeline中,addLast0(newCtx)。
  • 注册callHandlerCallbackLater的回调task,callHandlerCallbackLater(newCtx, true),入参参数为newCtx(封装channelHandler对象)。
  • Channel对象初始化后pipeline的状态如上图所示,增加了ChannelInitializer的这个handler对象。

register过程

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected abstract class AbstractUnsafe implements Unsafe {

        private void register0(ChannelPromise promise) {
            try {
                // 根据neverRegistered的标识判断是否执行invokeHandlerAddedIfNeeded。
                boolean firstRegistration = neverRegistered;
                // 1.执行channel到eventLoop的绑定动作
                doRegister();
                neverRegistered = false;
                registered = true;
                // 2.执行ChannelInitializer的初始化动作
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);

                // 3.执行pipeline的fireChannelRegistered()方法
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        // 4.第一次注册会执行pipeline.fireChannelActive()的操作。
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
            }
        }
    }
}
  • channel的注册过程包含4部曲,分别是绑定channel到eventLoop;执行invokeHandlerAddedIfNeeded动作;执行fireChannelRegistered动作;执行fireChannelActive动作。
  • invokeHandlerAddedIfNeeded负责回调init过程中生成的PendingHandlerCallback对象,会执行ChannelInitializer对应的handler的initChannel方法。
  • fireChannelRegistered动作触发channel的状态变为ChannelRegistered。
  • fireChannelActive动作触发channel的状态变为ChannelActive。

invokeHandlerAddedIfNeeded

public class DefaultChannelPipeline implements ChannelPipeline {

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
            // that were added before the registration was done.
            callHandlerAddedForAllHandlers();
        }
    }

    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            this.pendingHandlerCallbackHead = null;
        }

        // task是PendingHandlerAddedTask
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

    private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        // PendingHandlerAddedTask的入参是ChannelInitializer对应的ctx对象
        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }
    }

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // ctx.handler()返回的是ChannelInitializer对象
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
        }
    }
}
  • invokeHandlerAddedIfNeeded的执行按照下面的顺序进行执行 invokeHandlerAddedIfNeeded => callHandlerAddedForAllHandlers => PendingHandlerAddedTask#execute => PendingHandlerAddedTask#callHandlerAdded0 => ChannelInitializer#handlerAdded。
  • ctx.handler()返回的是ChannelInitializer的handler对象。

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                // initChannel会添加新的handler
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
            } finally {
                // 移除ChannelInitializer对应的handler。
                remove(ctx);
            }
            return true;
        }
        return false;
    }

    private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }
}
  • ChannelInitializer#handlerAdded执行对应的initChannel()方法,完成ChannelInitializer的initChannel()方法并添加新的handler。
  • remove(ctx)会移除ChannelInitializer对应的handler。
  • 执行ChannelInitializer#initChannel添加新handler同时移除ChannelInitializer自身handler的pipeline如上图。

输入消息处理流程

ChannelInboundHandler
  • channelRegistered 注册事件,channel注册到EventLoop上后调用,例如服务岗启动时,pipeline.fireChannelRegistered();
  • channelUnregistered 注销事件,channel从EventLoop上注销后调用,例如关闭连接成功后,pipeline.fireChannelUnregistered();
  • channelActive 激活事件,绑定端口成功后调用,pipeline.fireChannelActive();
  • channelInactive非激活事件,连接关闭后调用,pipeline.fireChannelInactive();
  • channelRead 读事件,channel有数据时调用,pipeline.fireChannelRead();
  • channelReadComplete 读完事件,channel读完之后调用,pipeline.fireChannelReadComplete();
  • channelWritabilityChanged 可写状态变更事件,当一个Channel的可写的状态发生改变的时候执行,可以保证写的操作不要太快,防止OOM,pipeline.fireChannelWritabilityChanged();
  • userEventTriggered 用户事件触发,例如心跳检测,ctx.fireUserEventTriggered(evt);
  • exceptionCaught 异常事件说明:我们可以看出,Inbound事件都是由I/O线程触发,用户实现部分关注的事件被动调用。
ChannelInboundInvoker
  • ChannelInboundInvoker负责fire上述的各类事件。

pipeline.fireChannelRead()读事件流程

public final class NioEventLoop extends SingleThreadEventLoop {
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

        // 省略相关代码
        try {
            // 省略相关代码
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
}
  • NioEventLoop关注OP_READ|OP_ACCEPT两类事件,都属于输入事件类型。
  • 进入unsafe.read()进入读取操作,unsafe为AbstractNioByteChannel#NioByteUnsafe。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    protected class NioByteUnsafe extends AbstractNioUnsafe {
        public final void read() {
            // 省略相关代码
            try {
                do {
                    pipeline.fireChannelRead(byteBuf);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
            } finally {
            }
        }
    }
}


public class DefaultChannelPipeline implements ChannelPipeline {

    public final ChannelPipeline fireChannelRead(Object msg) {
        // 从pipeline对象当中的head开始执行遍历
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
}


abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
        }
    }
}
  • AbstractNioByteChannel#NioByteUnsafe的read触发pipeline.fireChannelRead()操作,进入pipeline的事件触发流程,参数head表示从pipeline的head开始遍历。
  • AbstractChannelHandlerContext#invokeChannelRead属于静态方法,个人认为是AbstractChannelHandlerContext提供的为ChannelContext执行的入口
  • AbstractChannelHandlerContext#invokeChannelRead方法内部执行next.invokeChannelRead进行了Context的执行域,第一次Context为HeadContext对象。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // handler()返回Context对象,第一次返回HeadContext对象。
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
}

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
}
  • ((ChannelInboundHandler) handler()).channelRead(this, msg)的handler()方法返回的是Context对象本身,第一次表示为HeadContext对象。
  • HeadContext#channelRead表示该handler的处理逻辑(只是这里没有任何处理逻辑),然后通过ctx.fireChannelRead(msg)唤醒当前Context对象下的下一个Context对象。
  • 唤醒当前Context对象下的下一个Context对象的处理逻辑统一在AbstractChannelHandlerContext当中实现的。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // findContextInbound寻找当前Context的下一个Context对象
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    //  findContextInbound寻找当前Context的下一个Context对象
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    // 进入到转为下一个Context的执行逻辑的核心函数
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 下一个待执行的Context对象
            next.invokeChannelRead(m);
        } else {
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // handler()返回Context对象
                // channelRead被重载执行逻辑,需要在实现中再次执行ctx.fireChannelRead(msg);
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
}
  • AbstractChannelHandlerContext作为Context的父类,其核心逻辑在于负责寻找下一个Context对象并开始下一个Context对象的执行逻辑。
  • AbstractChannelHandlerContext#fireChannelRead负责查找下一个Context对象并通过AbstractChannelHandlerContext#invokeChannelRead实现当前Context到下一个Context执行的交接
  • AbstractChannelHandlerContext#invokeChannelRead内部执行已经是下一个Context对象的channelRead方法。
    -AbstractChannelHandlerContext作为Context的父类,提供了通用的查询Context、执行Context的方法,本质这些逻辑都是在前一个Context对象的方法中执行,然后通过查询下一个Context对象进行切换。
public class LoggingHandler extends ChannelDuplexHandler {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "RECEIVED", msg));
        }
        ctx.fireChannelRead(msg);
    }
}
  • 以LoggingHandler为例,channelRead()内部负责执行本职的log功能,同时通过ctx.fireChannelRead(msg)继续触发下一个Handler的操作。
  • 所有的Handler操作本职都是方法的串行。

输出消息处理流程

ChannelOutboundHandler
  • bind 事件,绑定端口。
  • close事件,关闭channel。
  • connect事件,用于客户端,连接一个远程机器。
  • disconnect事件,用于客户端,关闭远程连接。
  • deregister事件,用于客户端,在执行断开连接disconnect操作后调用,将channel从EventLoop中注销。
  • read事件,用于新接入连接时,注册成功多路复用器上后,修改监听为OP_READ操作位。
  • write事件,向通道写数据。
  • flush事件,将通道排队的数据刷新到远程机器上。
ChannelOutboundInvoker
  • ChannelOutboundInvoker负责处理上述各类事件。

DefaultChannelPipeline#write

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    public ChannelFuture write(Object msg) {
        // 进入pipeline的处理流程
        return pipeline.write(msg);
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
    public final ChannelFuture write(Object msg) {
        // 从tail开始执行
        return tail.write(msg);
    }
}
  • DefaultChannelPipeline#write从pipeline的tail开始进行访问。
  • tail.write()会调用父类AbstractChannelHandlerContext#write开始pipeline的handler的调用。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    // 负责从tail开始往head进行遍历,查找输出事件的handler
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        // 省略代码
        write(msg, false, promise);

        return promise;
    }

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 从tail开始遍历查找下一个Context进行操作
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                // 执行Context对象的invokeWrite方法
                next.invokeWrite(m, promise);
            }
        } else {
        }
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            // 执行handler自己重载的write方法
            invokeWrite0(msg, promise);
        } else {
            // 执行handler自己重载的write方法或者直接使用父类的write方法
            write(msg, promise);
        }
    }

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            // 参考LoggingHandler为例进行说明
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
}
  • AbstractChannelHandlerContext#write通过findContextOutbound()查找tail的前一个输出事件的handler。
  • AbstractChannelHandlerContext按照write => invokeWrite => invokeWrite0的顺序进行调用,在invokeWrite0的内部执行实际业务handler的write()方法。
  • ((ChannelOutboundHandler) handler())返回实际的handler对象,如LoggingHandler。
  • 实际handler对象如LoggingHandler内部会通过ctx.write(msg, promise)重新开始pipeline的handler对象的寻找,完成一次传递调用。
public class LoggingHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "WRITE", msg));
        }
        ctx.write(msg, promise);
    }
}

image.png

参考

Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
netty源码分析系列——ChannelHandler系列

相关文章

网友评论

    本文标题:netty源码分析 - ChannelHandler

    本文链接:https://www.haomeiwen.com/subject/ytasehtx.html