美文网首页
Netty NioSocketChannel写数据源码分析

Netty NioSocketChannel写数据源码分析

作者: whateverblake | 来源:发表于2020-07-12 16:07 被阅读0次

介绍

本编文章我们深入探究下使用netty通信的双方是如何写数据到网络

说明

我们基于NIO进行分析

写数据到网络

我先分析使用netty通信的一端是如何实现发送给另一端的,我假设一个场场景:自定义的一个channelHandler在channelActive方法被触发的时候通过channelContext向通信对端输出hello world,代码如下

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //Unpooled.copiedBuffer 的作用就是把输入的字符串包装成ByteBuf
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes()));
    }
}

我看下ctx.writeAndFlush的调用链
writeAndFlush(msg) --> writeAndFlush(msg,promise) --> write(msg,flush,promise)
上面调用链上的方法都在ChannelHandlerContext中,我看下write方法的源代码

//在我们的例子中flush=true
private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        //findContextOutbound方法的目的是在pipeline链上从本节点开始向head方向去找到executionMask符合MASK_WRITE | MASK_FLUSH的outBoundContext
        //然后触发这个outBoundContext.invokeWriteAndFlush方法
        //outBoundContext.invokeWriteAndFlush又会调用其绑定的outBoundHandler.write方法
       //最后一个满足条件的outBoundContext是HeadText

        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
         //如果提交写操作的线程和当前NioEventLoop绑定的线程不同,那么向NioEventLoop提交一个写数据的任务
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }

如果不明白上面说的netty事件处理是怎么回事可以查看https://www.jianshu.com/p/36803adcbc02
如何不明白上面提到的netty线程的问题可以查看https://www.jianshu.com/p/732f9dea34d7
我们看下AbstractChannelHandlerContext.invokeWriteAndFlush的源代码

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
           //invokeWrite0会调用channelHandlerContext绑定的handler的write方法
            invokeWrite0(msg, promise);
            //invokeFlush0会调用channelHandlerContext绑定的handler的flush方法
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

HeadContext(本身实现了handler接口)是真正是实现把netty数据写出去的handler,我们分析HeadContext的write方法和flush方法

 @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

又看到了熟悉的unsafe了,我先分析unsafe.write的源代码

unsafe.write
 @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();
            //在数据被写出到网络上之前,所有待写出的数据都会存放在outboundBuffer中,这个类相当于发送缓存
           //将来flush就是从outboundBuffer取出待写出数据写出到网络上
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, newClosedChannelException(initialCloseCause));
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                //filterOutboundMessage的作用判断输入的msg是不是ByteBuf或者FileRegion类型,如果不是那么就抛出异常
              //如果msg是ByteBuf类型但是不是DirectByteBuf类型,那么还会把msg转化成DirectByteBuf类型
              //这里面会涉及到netty的内存管理相关的知识,请看我写的另一篇文章[https://www.jianshu.com/p/550704d5a628](https://www.jianshu.com/p/550704d5a628)

                msg = filterOutboundMessage(msg);
               //计算msg的字节大小
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            //把msg加入到outboundBuffer中
            outboundBuffer.addMessage(msg, size, promise);
        }

ChannelOutboundBuffer是用户写出数据的缓存类,数据在写出之前都会被加入到ChannelOutboundBuffer中,我再分析ChannelOutboundBuffer.addMessage方法之前,先看下ChannelOutboundBuffer的一些关键属性

public final class ChannelOutboundBuffer {
    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 6 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
   //ChannelOutboundBuffer对象占用的字节数(如果不明白学习下JOL)
    static final int 
CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);

    private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        @Override
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
        }
    };
    //所属的SocketChannel
    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //
    // The Entry that is the first in the linked-list structure that was flushed
    //将要被被flush的msg链表
    private Entry flushedEntry;
    // The Entry which is the first unflushed in the linked-list structure
  //新添加的msg会被加入unflushedEntry链表
    private Entry unflushedEntry;
    // The Entry which represents the tail of the buffer
   //指向最新加入缓存链的msg
    private Entry tailEntry;
    // The number of flushed entries that are not written yet
    private int flushed;

    private int nioBufferCount;
    private long nioBufferSize;

    private boolean inFail;

    //目前缓存数据的总字节数
    @SuppressWarnings("UnusedDeclaration")
    private volatile long totalPendingSize;

    //当前缓存链表是不是不可写状态
    @SuppressWarnings("UnusedDeclaration")
    private volatile int unwritable;

看过了ChannelOutboundBuffer的属性,我们分析下ChannelOutboundBuffer.addMessage方法

addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
       //把msg对象封装成缓存节点对象,注意这里的Entry是可回收对象
       //关于netty的对象复用机制我在另一篇文章中有分析[https://www.jianshu.com/p/8f629e93dd8c](https://www.jianshu.com/p/8f629e93dd8c)

        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
           //如果tailEntry不为空,把msg的entry加入到缓存链表的末尾
            Entry tail = tailEntry;
            tail.next = entry;
        }
        //tailEntry指向最新加入的entry
        tailEntry = entry;
       
        if (unflushedEntry == null) {
           //如果unflushedEntry为空,让unflushedEntry指向新加入的entry
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
      //更新缓存中的数据总字节大小
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

上面就是unsafe.write的过程分析,我们看到unsafe.write就是把用户写出的数据放到了ChannelOutboundBuffer代表的缓存链表中,netty对写出缓存配置HighWaterMark和LowWaterMark两个表示容量界限的参数,当ChannelOutboundBuffer缓存的数据量大于HighWaterMark的时候,ChannelOutboundBuffer的unwritable会被设置成1,同事会触发fireChannelWritabilityChanged事件。incrementPendingOutboundBytes方法是实现这个判断逻辑

接下来我们分析缓存数据是如何被刷到网络上的

unsafe.flush
 @Override
        public final void flush() {
            assertEventLoop();
          
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
           //addFlush的作用就是把unflushedEntry链表上的数据转移到flushedEntry链表上
            outboundBuffer.addFlush();
            //写数据到网络
            flush0();
        }

我们先分析outboundBuffer.addFlush()

outboundBuffer.addFlush()
 public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                //记录本次需要写出到网络上的msg总数
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
           //清空unflushedEntry
            unflushedEntry = null;
        }
    }
flush0

这个是真正写数据到网络的入口方法

protected void flush0() {
         //inFlush0表示本channel是不是正在写数据到网络,如果是,那么就返回
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }
           //设置inFlush0=true表示本SocketChannel正在写数据到网络
            inFlush0 = true;

            // Mark all pending write requests as failure if the channel is inactive.
          //判断底层的SocketChannel是不是处于激活状态,如果没有激活那么需要设置本次刷新数据到网络的操作为失败状态
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
              //把数据刷到网络的核心方法
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    initialCloseCause = t;
                    close(voidPromise(), t, newClosedChannelException(t), false);
                } else {
                    try {
                        shutdownOutput(voidPromise(), t);
                    } catch (Throwable t2) {
                        initialCloseCause = t;
                        close(voidPromise(), t2, newClosedChannelException(t), false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
        }

doWrite 源代码

doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //获取java底层的SocketChannel
        SocketChannel ch = javaChannel();
        //一次最多可以执行多少次刷出数据到网络操作
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
               //如果SocketChannel在selector上注册了OP_WRITE事件,那么写完数据之后,取消SocketChannel注册的OP_WRITE事件
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            //每一次写操作最多可以写多少数据
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            //初始化写操作ByteBuffer数组,in.nioBuffers方法我们在下面详解
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            //通过in.nioBuffer得到的ByteBuffer数组的长度
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
               //ByteBuffer数组长度是0
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes. 
                    //比如写出的数据是FileRegion类型而不是ByteBuffer类型
                    writeSpinCount -= doWrite0(in);
                    break;
                //只有一个ByteBuffer的数据待刷到网络上
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                   //取得待刷ByteBuffer
                    ByteBuffer buffer = nioBuffers[0];
                   //buffer需要刷到网络上的字节数
                    int attemptedBytes = buffer.remaining();
                    //java底层的SocketChannel写出数据到网络上,返回写出的数据量
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                       //如果SocketChannel没把数据写出去,说socket tcp写缓存已经满了
                       //那么通过incompleteWrite方法使得SocketChannel向selector注册OP_WRITE,等待socket tcp 写缓存可用
                        incompleteWrite(true);
                        return;
                    }
                  //根据试图写入的数据量attemptedBytes,实际写入的数据量localWrittenBytes动态修改单次最大容许写入数据量maxBytesPerGatheringWrite
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    //根据实际的写入数据量更新ChannelOutboundBuffer的缓存状态
                   //如果localWrittenBytes等于attemptedBytes那么把对应的entry从缓存节点中删除
                   //如果localWrittenBytes不等于attemptedBytes说明本次写入没有把一个msg中包含的数据完整的写到网络上,
                 //那么需要更新msg对应的ByteBuf读写指针的位置,等待下次继续执行
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
         //如果一次需要写出多个ByteBuffer
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    //算出多个待写出ByteBuffer的大小
                    long attemptedBytes = in.nioBufferSize();
                   //SocketChannel写出ByteBuffers到网络,返回写出的数据量
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                         //同上
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
            //同上
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                  //同上
                    in.removeBytes(localWrittenBytes);
                  //writeSpinCount减去1
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);
        //如果writeSpinCount < 0,说明ChannelOutboundBuffer上还有一些缓存节点没有被刷到网络
        incompleteWrite(writeSpinCount < 0);
    }

我们分析下ChannelOutboundBuffer.nioBuffers,这个方法是用来把缓存链上的缓存节点转换成ByteBuffer,将来SocketChannel直接把ByteBuffer刷到网络上

nioBuffers
  //maxCount用来设置本次缓存节点转化出的ByteBuffer数组的最大长度
  //maxBytes用来设置转化出的ByteBuffer的最大容量,如果转化出的ByteBuffer的容量已经超过maxBytes,那么即使ChannelOutboundBuffer上还有缓存节点,在本次转化中都不再进行转化
 public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
        assert maxCount > 0;
        assert maxBytes > 0;
        long nioBufferSize = 0;
        int nioBufferCount = 0;
        final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        //NIO_BUFFERS.get默认返回一个长度为1024的ByteBuffer数组
        ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
        Entry entry = flushedEntry;
        //判断entry是不是可以flush的,while循环会一直取缓存链上的缓存节点
        while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
            if (!entry.cancelled) {
                ByteBuf buf = (ByteBuf) entry.msg;
                final int readerIndex = buf.readerIndex();
                //计算当前缓存节点的大小
                final int readableBytes = buf.writerIndex() - readerIndex;

                if (readableBytes > 0) {
                    //在nioBufferCount!=0 的情况下,如果最大容许写出的数据量小于本次待转化为ByteBuffer的数据量加上已经转化成ByteBuffer的数据量,那么就不再继续做ByteBuffer转化了
                    if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
                        // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
                        // we stop populate the ByteBuffer array. This is done for 2 reasons:
                        // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
                        // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
                        // on the architecture and kernel but to be safe we also enforce the limit here.
                        // 2. There is no sense in putting more data in the array than is likely to be accepted by the
                        // OS.
                        //
                        // See also:
                        // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                        // - http://linux.die.net/man/2/writev
                        break;
                    }
                  //更新已经转化成ByteBuffer的缓存节点的总字节大小
                    nioBufferSize += readableBytes;
                    int count = entry.count;
                    if (count == -1) {
                        //noinspection ConstantValueVariableUse
                       // buf.nioBufferCount表示ByteBuf绑定的java ByteBuffer的个数,默认为1
                        entry.count = count = buf.nioBufferCount();
                    }
                   //nioBufferCount表示已经转化出的ByteBuffer的个数,因为在目前netty这个版本中nioBuffers.length的初始值是等于maxCount的,
                   //所以expandNioBufferArray没有机会得到执行
                    int neededSpace = min(maxCount, nioBufferCount + count);
                    if (neededSpace > nioBuffers.length) {
                        nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                        NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                    }
                    if (count == 1) {
                        ByteBuffer nioBuf = entry.buf;
                        if (nioBuf == null) {
                            // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
                            // derived buffer
                           //ByteBuf.internalNioBuffer就是返回ByteBuf绑定的ByteBuffer(position=readerIndex, limit = readerIndex+readableBytes)的副本
                            entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                        }
                        //设置nioBuffers[nioBufferCount]为当前转化出的ByteBuffer
                       //同时更新已经转化出的ByteBuffer的数量
                        nioBuffers[nioBufferCount++] = nioBuf;
                    } else {
                        // The code exists in an extra method to ensure the method is not too big to inline as this
                        // branch is not very likely to get hit very frequently.
                      //如果一个Entry包含多个ByteBuffer那么通过nioBuffers转化
                      //但是这个地方在什么情况下会被调用我还没看到,而且没有看到在这种情况下如何处理entry的代码
                        nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
                    }
                    //如果转化的ByeBuffer的个数已经大于等于了maxCount那么结束转化
                    if (nioBufferCount >= maxCount) {
                        break;
                    }
                }
            }
            entry = entry.next;
        }
    //记录本次转化的ByteBuffer的个数
        this.nioBufferCount = nioBufferCount;
    //记录本次转化出的ByteBuffer的总大小
        this.nioBufferSize = nioBufferSize;

        return nioBuffers;
    }

上面转化出的ByteBuffer[],会被SocketChannel写出到网络上。

我们再看下当一个缓存节点数据被写出到网络后,这个缓存节点会被如何处理

removeBytes

当缓存节点entry缓存的数据被全部或部分刷到网络上之后,entry节点在缓存链上的状态是怎么样的呢

 public void removeBytes(long writtenBytes) {
        for (;;) {
             //获取当前缓存节点对应的msg
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }

            final ByteBuf buf = (ByteBuf) msg;
            //获取缓存数据的对应的ByteBuf的readerIndex
            final int readerIndex = buf.readerIndex();
            //获取ByteBuf保存的缓存数据的大小
            final int readableBytes = buf.writerIndex() - readerIndex;
           //如果写出到网络上的数据量大于本节点的缓存数据大小
           //那么意味着本缓存节点entry已经被全部刷到网络上,所以
           //调用remove(),把entry从缓存链上删除
            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                   //如果用户设置了写入进度通知的功能,progress通知用户写入进度
                    progress(readableBytes);
                   //更新写入的数据量
                    writtenBytes -= readableBytes;
                }
                remove();
            } else { // readableBytes > writtenBytes
                 //当前缓存节点的数据量大于writtenBytes
               //那么需要更新entry对应ByteBuf的readIndex,这时缓存节点entry还是继续保留在缓存链上,等待下一次继续被写出
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
       //最后清空转化的ByteBuffer数组
        clearNioBuffers();
    }

entry节点从缓存链上删除之后,这个时候netty更新totalPendingSize = totalPendingSize - entry缓存大小,更新之后如果totalPendingSize < LowWaterMake会更新ChannelOutboundBuffer.unwritable =0表示当前ChannelOutboundBuffer为可写状态,同时触发fireChannelWritabilityChanged事件

最后我们看一下incompleteWrite方法

incompleteWrite
  protected final void incompleteWrite(boolean setOpWrite) {

        // Did not write completely.
        //setOpWrite用来表示是不是需要在selector上注册OP_WRITE事件
       //当一次写的过程中没有把ChannelOutboundBuffer中缓存的数据全部刷到网络上,说明底层的socket写缓存已经满了,
       //这时候需要向selector注册OP_WRITE事件,等socket写缓存有空间了,将来在继续刷数据到网络
        if (setOpWrite) {
            //setOpWrite()就是SocketChannel向selector注册OP_WRITE事件
            setOpWrite();
        } else {
            // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
            // use our write quantum. In this case we no longer want to set the write OP because the socket is still
            // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
            // and set the write OP if necessary.
          //如果数据全部刷完了,那么SocketChannel需要清空OP_WRITE
            clearOpWrite();

            // Schedule flush again later so other tasks can be picked up in the meantime
            eventLoop().execute(flushTask);
        }
    }


分析完成

相关文章

网友评论

      本文标题:Netty NioSocketChannel写数据源码分析

      本文链接:https://www.haomeiwen.com/subject/ielccktx.html