美文网首页
Netty源码笔记(五)消息发送之writeAndFlush

Netty源码笔记(五)消息发送之writeAndFlush

作者: 李亚林1990 | 来源:发表于2019-01-22 20:54 被阅读28次

    在Netty学习笔记(二)中我们介绍了服务端消息接收的处理流程,通过pipeline.fireChannelRead来回调pipeline处理器链中每一个inboundHandler的channelRead方法。
    本篇我们将简单分析下消息发送的处理流程。

    通常我们会在最后一个inboundHandler的channelRead方法中完成业务逻辑的处理,并将响应消息体调用ChannelHandlerContext.writeAndFlush(response)发送回远端。
    相关类的继承结构:


    image.png

    结合Netty学习笔记(一),pipeline.addLast(ChannelHandler... handlers)最终会通过new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);来对每一个处理器进行包装,并添加到pipeline的处理器链中。
    详细代码逻辑:

        //AbstractChannelHandlerContext.writeAndFlush
        @Override
        public ChannelFuture writeAndFlush(Object msg) {
            return writeAndFlush(msg, newPromise());
        }
    
        @Override
        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            write(msg, true, promise);
    
            return promise;
        }
    
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            //此处为核心代码
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                safeExecute(executor, task, promise, m);
            }
        }
        //outbound调用顺序
        private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while (!ctx.outbound);
            return ctx;
        }
    

    关于findContextOutbound在Netty学习笔记(二)中已涉及,与inbound的执行顺序相反,依次调用每一个执行器的write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise),最终执行到pipeline处理器器链的HeadContext处理器。

            //HeadContext.write
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                //在此处调用unsafe实现将消息发送到远端
                unsafe.write(msg, promise);
            }
    

    消息发送逻辑比较简单,本篇到此结束。

    相关文章

      网友评论

          本文标题:Netty源码笔记(五)消息发送之writeAndFlush

          本文链接:https://www.haomeiwen.com/subject/icmrjqtx.html