美文网首页程序员
netty源码分析(22)- outBound事件的传播

netty源码分析(22)- outBound事件的传播

作者: Jorgezhong | 来源:发表于2019-03-01 12:36 被阅读0次

上一节学习了inbound事件的传播,充分理解了在pipeline中是如何向一个个handler传播事件的,以channelRead事件也就是读事件为例,研究了其处理逻辑。

本节学习outbound事件的传播,和inbound事件有相似之处。以write事件为例,进行学习研究。

  • ChannelHandler家族
    如下图,ChannelHandler家族分为ChannelInboundHandler以及ChannelOutboundHandler,分别定义了入站处理器以及出站处理器,同时也提供了对应的实现。上一节inbound事件传播也发现确定是inbound事件还是outbound事件是由instanceof关键子判断是否实现了对应的接口。
    ChannelHandler家族

对比一下ChannelInboundHandlerChannelOutboundHandler的方法,可以发现,ChannelInboundHandler的方法以被动触发为主,而ChannelOutboundHandler的方法则是主动行为。

ChannelInboundHandler ChannelInboundHandler
channelRegistered(事件触发:注册) deregister(注销)
channelUnregistered(事件触发:注销) disconnect(取消连接)
channelActive(事件触发:激活连接) connect(连接)
channelRead(事件触发:读) read(读)
channelReadComplete(事件触发:读完成) write(写)
userEventTriggered(事件触发:用户事件) flush(刷缓存)
channelWritabilityChanged(事件触发:通道可读状态被修改) close(关闭连接)
exceptionCaught(事件触发:异常) bind(绑定端口)
  • ChannelOutboundHandler的执行顺序正好和ChannelInboundHandler相反,是倒序的。
class DataServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ch.pipeline().addLast(
                new OutboundHandlerA(),
                new OutboundHandlerB(),
                new OutboundHandlerC()
        );
    }
}


class OutboundHandlerA extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutboundHandlerA : " + msg);
        ctx.write(msg, promise);
    }
}

class OutboundHandlerB extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutboundHandlerB : " + msg);
        ctx.write(msg, promise);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> {
            ctx.channel().write("hello world");
        }, 3, TimeUnit.SECONDS);
    }
}

class OutboundHandlerC extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutboundHandlerC : " + msg);
        ctx.write(msg, promise);
    }
}
改变顺序之前

改变添加handler的顺序

class DataServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ch.pipeline().addLast(
                new OutboundHandlerB(),
                new OutboundHandlerA(),
                new OutboundHandlerC()
        );
    }
}

改变添加顺序之后
  • 跟踪ctx.channel().write("hello world");
    @Override
    public ChannelFuture write(Object msg) {
        //从pipeline开始调用
        return pipeline.write(msg);
    }

    @Override
    public final ChannelFuture write(Object msg) {
        //从尾节点开始传播
        return tail.write(msg);
    }
    @Override
    public ChannelFuture write(Object msg) {
        //添加一个回调Promise,包装channel和executor
        return write(msg, newPromise());
    }

最终调用到AbstractChannelHandlerContext#write()方法,主要是做了两件事

  1. findContextOutbound方法找到下一个ChannelOutboundHandlerContext
  2. 判断是否需要flush,选择执行write回调方法之后是否执行flush回调方法
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        //查找下一个ChannelOutboundHandlerContext
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            //判断是否刷新
            if (flush) {
                //执行写并刷新方法
                next.invokeWriteAndFlush(m, promise);
            } else {
                //执行写方法
                next.invokeWrite(m, promise);
            }
        } else {
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }

  • findContextOutbound方法找到下一个ChannelOutboundHandlerContext
    private AbstractChannelHandlerContext findContextOutbound() {
        //循环往前查找,通过outbound属性判断
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
  • 执行write回调方法
    private void invokeWrite(Object msg, ChannelPromise promise) {
        //判断handler的状态是可以执行回调函数的
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //执行回调函数write
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

  • invokeWriteAndFlush执行完write回调方法之后执行flush回调方法
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            //执行write
            invokeWrite0(msg, promise);
            //执行flush
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

    private void invokeFlush0() {
        try {
            //回调flush方法
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }
  • 通过跟踪源码,也不难发现无论是从tail节点开始还是从当前节点开始调用write方法,最终都会到head节点。而头节点正是使用unsafe来具体完成这些操作的。
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

相关文章

网友评论

    本文标题:netty源码分析(22)- outBound事件的传播

    本文链接:https://www.haomeiwen.com/subject/cmuruqtx.html