美文网首页
Netty writeAndFlush解析

Netty writeAndFlush解析

作者: 隔壁王哥 | 来源:发表于2022-05-29 14:18 被阅读0次

    概述

    Netty底层数据传输基于JDK NIO,调用writeAndFlush方法写出数据时,首先会通过编码器将Java对象编码为ByteBuf,然后会将ByteBuf转化为JDK NIO ByteBuffer,最后通过NioSocketChannel将数据写入到socket缓冲区。当socket缓冲区空间不足时会注册OP_WRITE事件,等到缓冲区的数据发送成功后有空闲空间时,会触发OP_WRITE事件,继续写出数据。

    事件传播机制

    Netty事件分为入站事件与出站事件,可以通过ChannelPipline或者ChannelHandlerContext进行事件传播。通过ChannelPipline传播入站事件,它将被从ChannelPipeline的头部开始一直被传播到ChannelPipeline的尾端,出站事件则从尾端开始传递到头部。通过ChannelHandlerContext传播入站事件,它将被从下一个ChannelHandler开始直至传递到尾端,出站事件则从下一个ChannelHandler直至传递到头部。

                                                       I/O Request
                                                  via Channel or
                                              ChannelHandlerContext
                                                            |
        +---------------------------------------------------+---------------+
        |                           ChannelPipeline         |               |
        |                                                  \|/              |
        |    +---------------------+            +-----------+----------+    |
        |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  |               |
        |               |                                  \|/              |
        |    +----------+----------+            +-----------+----------+    |
        |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  .               |
        |               .                                   .               |
        | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
        |        [ method call]                       [method call]         |
        |               .                                   .               |
        |               .                                  \|/              |
        |    +----------+----------+            +-----------+----------+    |
        |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  |               |
        |               |                                  \|/              |
        |    +----------+----------+            +-----------+----------+    |
        |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  |               |
        +---------------+-----------------------------------+---------------+
                        |                                  \|/
        +---------------+-----------------------------------+---------------+
        |               |                                   |               |
        |       [ Socket.read() ]                    [ Socket.write() ]     |
        |                                                                   |
        |  Netty Internal I/O Threads (Transport Implementation)            |
        +-------------------------------------------------------------------+
    

    Netty缓冲区

    Netty为了提高传输数据的效率,在写出数据时,会先将数据(ByteBuf)缓存到ChannelOutboundBuffer中,等到调用flush方法时才会将ChannelOutboundBuffer中的数据写入到socket缓冲区。

    ChannelOutboundBuffer中有三个重要属性:

    ## 链表中已刷新的开始节点
    private Entry flushedEntry;
    ## 链表中第一个未刷新的节点
    private Entry unflushedEntry;
    ## 链表中最后一个节点
    private Entry tailEntry;
    

    从其属性可以看出,ChannelOutboundBuffer内部是一个链表结构,里面有三个指针:

    Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    

    ChannelOutboundBuffer中有两个比较重要的方法,addMessage:将数据以链表形式缓存下来,addFlush:移动链表指针,将缓存的数据标记为已刷新,注意此时并没有将数据写入到socket缓冲区。接下来我们看下两个方法的实现:

    addMessage

    我们进入其addMessage方法分析下它是怎么缓存数据的:

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
    
        incrementPendingOutboundBytes(size, false);
    }
    

    当第一次添加数据数,会将数据封装为Entry,此时tailEntry、unflushedEntry指针指向这个Entry,flushedEntry指针此时为null。每次添加数据都会生成新的Entry,并将tailEntry指针指向该Entry,而unflushedEntry指针则一直指向最初添加的Entry,我们通过画图展示下:

    第一次添加:

    第N次添加:


    为了防止缓存数据过大,Netty对缓存数据的大小做了限制:

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }
    
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    

    addMessage方法最后会调用incrementPendingOutboundBytes方法记录已缓存的数据大小(totalPendingSize),如果该大小超过了写缓冲区高水位阈值(默认64K),则更新不可写标志(unwritable),并传播Channel可写状态发生变化事件:fireChannelWritabilityChanged:

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }
    

    addFlush

    移动链表指针,将缓存的数据标记为已刷新,并设置每个数据节点状态为不可取消:

    public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                // 设置为不能取消,如果设置失败则说明该ByteBuf已经被取消,需要释放内存并更新totalPendingSize大小
                // 如果totalPendingSize小于缓冲区低水位阈值(默认32K)则更新不可写标志(unwritable),
                // 并传播Channel可写状态发生变化事件
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);
    
            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
    

    执行完addFlush方法后,链表图示如下:

    数据传输

    通过ChannelHandlerContext#writeAndFlush方法来分析下Netty是如何将数据通过网络进行传输的:

    ctx.writeAndFlush("just test it");
    

    ChannelHandlerContext#writeAndFlush方法最终会调用其子类AbstractChannelHandlerContext的writeAndFlush方法:

    #AbstractChannelHandlerContext
    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
      
        if (!validatePromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
            
        write(msg, true, promise);
    
        return promise;
    }
    

    主要逻辑在write方法中:

    #AbstractChannelHandlerContext
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 找到下一个ChannelHandlerContext
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        // 判断是否在IO线程中执行
        if (executor.inEventLoop()) {
            if (flush) {
                // 调用下一个ChannelHandlerContext的writeAndFlush方法
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
    

    write方法主要做了两件事,一是:找到下一个ChannelHandlerContext。二是:调用下一个ChannelHandlerContext的w riteAndFlush方法传播事件。writeAndFlush方法是一个出站事件,前面我们也讲过对于出站事件,通过ChannelHandlerContext进行事件传播,事件是从pipline链中找到当前ChannelHandlerContext的下一个ChannelHandlerContext进行传播直至头部(HeadContext),在这期间我们需要自定义编码器对传输的Java对象进行编码,转换为ByteBuf对象,最终事件会传递到HeadContext进行处理。

    #AbstractChannelHandlerContext
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    

    invokeWriteAndFlush方法主要做了两件事,一是:调用invokeWrite0方法将数据放入Netty缓冲区中(ChannelOutboundBuffer),二是:调用invokeFlush0方法将缓冲区数据通过NioSocketChannel写入到socket缓冲区。

    写入Netty缓冲区

    invokeWrite0方法内部会调用ChannelOutboundHandler#write方法:

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
    

    前面说过,出站事件最终会传播到HeadContext,在传播到HeadContext之前我们需要自定义编码器对Java对象进行编码,将Java对象编码为ByteBuf,关于编码器本章节暂不进行解析。我们进入HeadContext的write方法:

    #DefaultChannelPipeline#HeadContext
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }
    

    HeadContext#write方法中会调用AbstractChannelUnsafe#write方法:

    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
            return;
        }
    
        int size;
        try {
            // 对数据进行过滤转换
            msg = filterOutboundMessage(msg);
            // 估测数据大小
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
            // 缓存数据
        outboundBuffer.addMessage(msg, size, promise);
    }
    

    该方法主要做了三件事情,一:对数据进行过滤转换,二:估测数据大小,三:缓存数据。我们先看下filterOutboundMessage方法:

    #AbstractNioByteChannel
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }
    
            return newDirectBuffer(buf);
        }
    
        if (msg instanceof FileRegion) {
            return msg;
        }
    
        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }
    

    一、对数据进行过滤转换:

    filterOutboundMessage方法首先会对数据进行过滤,如果数据不是ByteBuf或者FileRegion类型,则直接抛出异常。如果数据是ByteBuf类型,则判断数据是否为直接直接内存,如果不是则转换为直接内存以提升性能。

    二、估测数据大小:

    public int size(Object msg) {
            if (msg instanceof ByteBuf) {
                return ((ByteBuf) msg).readableBytes();
            }
            if (msg instanceof ByteBufHolder) {
                return ((ByteBufHolder) msg).content().readableBytes();
            }
            if (msg instanceof FileRegion) {
                return 0;
            }
            return unknownSize;
        }
    }
    

    三、缓存数据:

    最后会调用ChannelOutboundBuffer#addMessage方法将数据缓存到链表中,关于addMessage方法可以回顾下文章中的Netty缓冲区部分。

    写入Socket缓冲区

    回到AbstractChannelHandlerContext#invokeWriteAndFlush方法,方法内部在调用完invokeWrite0方法将数据放入到缓存后,会调用invokeFlush0方法,将缓存中的数据写入到socket缓冲区。invokeFlush0方法内部会调用ChannelOutboundHandler#flush方法:

    private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }
    

    flush方法最终会将事件传播到HeadContext的flush方法:

    #DefaultChannelPipeline#HeadContext
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }
    

    HeadContext#flush方法中会调用AbstractChannelUnsafe#flush方法:

    public final void flush() {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
    
        outboundBuffer.addFlush();
        flush0();
    }
    

    方法主要做了两件事,一:调用ChannelOutboundBuffer#addFlush方法,移动链表指针,将缓存的数据标记为已刷新。二:调用flush0方法将缓存中数据写入到socket缓冲区。关于addFlush方法可以看文中Netty缓冲区部分,我们直接进入flush0方法:

    #AbstractNioChannel#AbstractNioUnsafe
    protected final void flush0() {
        // Flush immediately only when there's no pending flush.
        // If there's a pending flush operation, event loop will call forceFlush() later,
        // and thus there's no need to call it now.
        if (isFlushPending()) {
            return;
        }
        super.flush0();
    }
    

    flush0方法主要做了两件事,一:判断是否有挂起的刷新。二:调用父类flush0方法。

    一、判断是否有挂起的刷新

    文中提到写入数据时,当socket缓冲区没有可用空间时会设置不可写状态,并注册OP_WRITE事件,等待socket缓冲区有空闲空间时会触发forceFlush,我们进入到isFlushPending方法看下方法是如何判断的:

    private boolean isFlushPending() {
        SelectionKey selectionKey = selectionKey();
        return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
    }
    

    二、调用父类flush0方法

    写入socket缓冲区的具体逻辑在AbstractNioChannel#AbstractNioUnsafe父类AbstractChannel#AbstractUnsafe中:

    protected void flush0() {
        // ...
    
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }
    
        // ...
    
        try {
            doWrite(outboundBuffer);
        } catch (Throwable t) {// ...} finally {
            // ...
        }
    }
    

    核心逻辑在doWrite方法中,我们进入到AbstractChannel子类NioSocketChannel的doWrite方法看下具体实现:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                // 所有数据已写完,清除OP_WRITE事件
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;
    
            // 将缓存中flushed指针对应的的ByteBuf链表转换为ByteBuffer数组
            ByteBuffer[] nioBuffers = in.nioBuffers();
            // ByteBuffer数量
            int nioBufferCnt = in.nioBufferCount();
            // ByteBuffer数组中写入的字节数
            long expectedWrittenBytes = in.nioBufferSize();
            // 获取Java NIO底层的NioSocketChannel
            SocketChannel ch = javaChannel();
    
            // 根据ByteBuffer数量执行不同的写逻辑
            switch (nioBufferCnt) {
                // 0表示需要写入的数据为FileRegion类型
                case 0:
                    super.doWrite(in);
                    return;
                // 如果只有一个ByteBuf则调用非聚集写
                case 1:
                    ByteBuffer nioBuffer = nioBuffers[0];
                        // 写自旋次数,默认为16
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        // 如果已写字节为0,则表示socket缓冲区已满,需要注册OP_WRITE事件
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }
    
            // 释放完全写入的缓冲区,并更新部分写入的缓冲区的索引
            in.removeBytes(writtenBytes);
    
            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }
    

    NioSocketChannel#doWrite方法根据nioBufferCnt大小执行不同的写逻辑,如果为0则调用AbstractNioByteChannel#doWrite方法。如果nioBufferCnt为1或者大于1,则调用NioSocketChannel不同的重载方法进行处理。注意,写数据时自旋次数默认为16,也就是说如果执行16次write后仍有数据未写完,则调用incompleteWrite方法将flush操作封装为一个任务,放入到队列中,目的是不阻塞其他任务。另外,如果调用NioSocketChannel#write方法后,返回的localWrittenBytes为0,则表示socket缓冲区空间不足,则注册OP_WRITE事件,等待有可用空间时会触发该事件,然后调用forceFlush方法继续写入数据。

    相关文章

      网友评论

          本文标题:Netty writeAndFlush解析

          本文链接:https://www.haomeiwen.com/subject/gmmgprtx.html