美文网首页
Netty源码分析----writeAndFlush

Netty源码分析----writeAndFlush

作者: _六道木 | 来源:发表于2018-07-04 00:33 被阅读30次

    (*文章基于Netty4.1.22版本)
    ctx.writeAndFlush相当于先调用ctx.write然后再调用ctx.flush,所以下面分析write和flush

    write

    write和flush会经过pipeline的每个outbound的Handler,之前文章分析过流程,这里不再分析。

    write方法最终到达HeadContext的write方法,然后什么都没做,将请求转发到底层实现unsafe,实现在AbstractUnsafe中

            public final void write(Object msg, ChannelPromise promise) {
                //Netty的缓冲区
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                //....
                int size;
                try {
                    msg = filterOutboundMessage(msg);// 对msg类型进行过滤和包装
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    //....
                }
                将msg加入缓冲区
                outboundBuffer.addMessage(msg, size, promise);
            }
    

    ChannelOutboundBuffer

    Netty调用write的时候,不是真的将数据写出去,而是使用了一个缓冲区去存放这份数据,这个缓冲区的实现就是ChannelOutboundBuffer,看下其主要变量

        //
        // 链表中第一个已经被标记为已flush的Entry元素
        private Entry flushedEntry;
        // 链表中第一个已经被标记为未flush的Entry元素
        private Entry unflushedEntry;
        // 链表最后一个Entry元素
        private Entry tailEntry;
        // 已经标记为flush但是还未写出去的Entry数量
        private int flushed;
        // ByteBuffer 的数量和大小
        private int nioBufferCount;
        private long nioBufferSize;
    

    ChannelOutboundBuffer是一个链表,每一个元素类型是Entry,结构如下

    Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

    接下来看下addMessage方法

        public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);
            if (tailEntry == null) {
                flushedEntry = null;
                tailEntry = entry;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
                tailEntry = entry;
            }
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            }
            incrementPendingOutboundBytes(entry.pendingSize, false);
        }
    

    链表的操作,先构造一个Entry,然后加入到链表尾部,最后调用了一下incrementPendingOutboundBytes方法

        private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }
            // 更新写入的大小        
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {// 如果当前缓冲区大小大于指定的值,则调用setWritable方法
                setUnwritable(invokeLater);
            }
        }
        private void setWritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue & ~1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue != 0 && newValue == 0) {
                        fireChannelWritabilityChanged(invokeLater);// 触发channelWritabilityChanged事件
                    }
                    break;
                }
            }
        }
    

    write涉及的ChannelOutboundBuffer功能就这些,其他的在flush的时候分析

    flush

    同理,flush最终调用的是unsafe的flush方法

            public final void flush() {
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                outboundBuffer.addFlush();
                flush0();
            }
    

    先调用了ChannelOutboundBuffer的addFlush方法,然后具体逻辑在flush0中,先看下addFlush方法实现

        public void addFlush() {
            Entry entry = unflushedEntry;
            if (entry != null) {
                if (flushedEntry == null) {// 将链表中第一个元素标记为flushed
                    flushedEntry = entry;
                }
                do {
                    flushed ++;//进行flush的数量
                    if (!entry.promise.setUncancellable()) {// 如果已经取消了,更新TOTAL_PENDING_SIZE_UPDATER的保存的数量
                        int pending = entry.cancel();
                        decrementPendingOutboundBytes(pending, false, true);// 
                    }
                    entry = entry.next;// 
                } while (entry != null);
                // 链表遍历完毕,所有的entry都flush,unflushedEntry代表为未flush的Entry,此时不存在了,所以设为null
                unflushedEntry = null;
            }
        }
    

    addFlush方法只是将缓冲区中的Entry标记为flush(更新unflushedEntry和flushedEntry的值)

    flush0实际上会调用到AbstractNioUnsafe的flush0方法

            protected final void flush0() {
                if (!isFlushPending()) {
                    super.flush0();
                }
            }
    

    AbstractNioUnsafe重写了父类的flush0方法,只是在其之上加了一层判断,通过isFlushPending返回值来判断是否需要进行flush操作,具体的flush操作还是在父类AbstractUnsafe中,看下isFlushPending

            private boolean isFlushPending() {
                SelectionKey selectionKey = selectionKey();
                return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
            }
    

    就两行代码,首先要SelectionKey有效啦,然后最重要的是后面那句

    selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0

    从代码上看,就是SelectionKey设置了OP_WRITE作为感兴趣的事件,那么即如果监听了OP_WRITE事件,那么就不flush。

    • 为什么要这样做?
    1. OP_WRITE

    要弄明白这个判断,首先要知道OP_WRITE这个事件。那么什么时候Selector会轮询到OP_WRITE事件?是通道可写的时候。
    回想一下一个简单的NIO demo,貌似没有处理这个OP_WRITE,程序也可以非常爽的在自己的电脑中的Client和Server中将Hello world发送来发送去,其实这种时候,通道都是可写的,所以监听一个可写事件,等到通道可写的时候再去进行某些操作貌似就没有意义了,因为在demo中,一般不会出现不可写的情况,而在实际运用中写入的数据太多,或者网络有问题,导致缓冲区满了,Channel.write可能返回0,即写出去的字节数为0,那么这时候代表Channel已经不可写了,再继续写也没有用,那么这个时候如果注册一个OP_WRITE事件,在Channel可写的时候,Selector就会轮询到OP_WRITE事件,这个事件,再去写,就OK了,所以这里判断如果监听了OP_WRITE事件,代表通道不可写,所以才监听的OP_WRITE事件,那么就不处理,等到NioEventLoop轮询到OP_WRITE事件再进行flush操作(这段是个人理解总结,不一定正确),具体代码如下

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            //....
            try {
                int readyOps = k.readyOps();
                //....
                // 如果监听
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    ch.unsafe().forceFlush();
                }
    
            } catch (CancelledKeyException ignored) {
            }
        }
    

    当通道可写的时候,进行flush操作,看下forceFlush实现

            public final void forceFlush() {
                super.flush0();
            }
    

    可以看到,还是调用了父类的flush0方法,没有isFlushPending的判断,直接刷新

    接下来看下flush0方法

            protected void flush0() {
                //....
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                //....
    
                try {
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    //....
                } finally {
                    inFlush0 = false;
                }
            }
    

    看下doWrite方法

        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            SocketChannel ch = javaChannel();
            // 类似并发编程的自旋次数
            int writeSpinCount = config().getWriteSpinCount();
            do {
                if (in.isEmpty()) {// 如果缓冲区为空
                    clearOpWrite();// 清除OP_WRITE监听
                    return;
                }
    
                // 获取每次写入最大的数量.
                int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
                // 转换成原生的ByteBuffer数组
                ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);、
                // buffer的数量
                int nioBufferCnt = in.nioBufferCount();
    
                switch (nioBufferCnt) {
                    case 0:
                        writeSpinCount -= doWrite0(in);
                        break;
                    case 1: {
                        ByteBuffer buffer = nioBuffers[0];
                        int attemptedBytes = buffer.remaining();
                        final int localWrittenBytes = ch.write(buffer);// 直接写入
                        if (localWrittenBytes <= 0) {// 通道可能不可写
                            incompleteWrite(true);
                            return;
                        }
                        adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                        in.removeBytes(localWrittenBytes);
                        --writeSpinCount;
                        break;
                    }
                    default: {
                        long attemptedBytes = in.nioBufferSize();// 这一批buffer的大小
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes <= 0) {// 通道可能不可写
                            incompleteWrite(true);
                            return;
                        }
                        adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                                maxBytesPerGatheringWrite);
                        // 这一次write写出localWrittenBytes大小的数据,需要将缓冲区中的数据清除
                        in.removeBytes(localWrittenBytes);
                        --writeSpinCount;
                        break;
                    }
                }
            } while (writeSpinCount > 0);
            
            incompleteWrite(writeSpinCount < 0);
        }
    

    这里要划几个重点:

    1. writeSpinCount:这个东西干嘛用的?源码注释里说了"类似自旋锁",那么我的理解是,自旋锁是不断重试去获取锁,因为前面获取锁失败了,而这里不是去获取锁,而是尝试将数据写完,但是每次都只能写一部分出去,所以重复writeSpinCount次write的操作,所以和自旋有点类似

    2. incompleteWrite(writeSpinCount < 0):1中说了会重复writeSpinCount操作,如果writeSpinCount都还未写完呢?如果writeSpinCount < 0证明在writeSpinCount次write中都还未写完,该表达式为true,那么看下incompleteWrite方法

        protected final void incompleteWrite(boolean setOpWrite) {
            // Did not write completely.
            if (setOpWrite) {
                setOpWrite();// 设置OP_WRITE事件
            } else {
                clearOpWrite();
                eventLoop().execute(flushTask);
            }
        }
    

    可以看到,如果setOpWrite为true,则设置了OP_WRITE事件,让Selector异步去处理OP_WRITE事件,强制flush,就是上面讲的内容。而如果入参为false,那么需要清除OP_WRITE事件,因为大部分情况下通道可写,如果一直监听这个事件,会不听轮询出该事件,导致类似空轮询的结果

    1. in.isEmpty():在缓冲区为空的情况下,也需要清除OP_WRITE事件,原因和2一样

    2. if (localWrittenBytes <= 0):这个和上面分析OP_WRITE事件的时候说过,通道不可写的情况

    • 那么还有个问题,由于一次write并不能将数据全部写出去,那Netty是如何处理缓冲区还未处理的数据的呢?

    这时需要看一下in.removeBytes方法

        public void removeBytes(long writtenBytes) {
            for (;;) {
                Object msg = current();// 等于flushedEntry.msg
                if (!(msg instanceof ByteBuf)) {
                    assert writtenBytes == 0;
                    break;
                }
    
                final ByteBuf buf = (ByteBuf) msg;
                final int readerIndex = buf.readerIndex();
                final int readableBytes = buf.writerIndex() - readerIndex;
    
                if (readableBytes <= writtenBytes) {
                    if (writtenBytes != 0) {
                        progress(readableBytes);
                        writtenBytes -= readableBytes;
                    }
                    remove();
                } else { // readableBytes > writtenBytes
                    if (writtenBytes != 0) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);
                        progress(writtenBytes);
                    }
                    break;
                }
            }
            clearNioBuffers();
        }
    

    这里首先获取缓冲区的flushedEntry节点,flushedEntry在一开始的时候已经讲过,代表链表中第一个被标记为flush的节点,而这个节点在addFlush的时候被初始化,这里首先会获取flushedEntry,可以看出,write的时候是从flushedEntry开始处理的,即flushedEntry使标记为待写入但是还未写成功的Entry。
    接下来会获取ByteBuf,获取当前ByteBuf可读的大小readableBytes(readerIndex和writerIndex含义如有不明白的另行百度),而根据大小分为两个分支:

    1. 成功写入的字节数大于等于当前ByteBuf可读大小
    2. 成功写入的字节数大于当前ByteBuf可读大小

    通过图来理解一下这两种情况


    image.png

    假设有4个Entry,每个Entry里的ByteBuf大小如图所示,当前写入的大小为120,过程如下:

    第一个循环-> 获取第一个Entry,其readableBytes=34<120,所以代表当前ByteBuf已经全部写入(总共写入120,当前为34,所以当前ByteBuf完全写入,另外86大小是其他ByteBuf),这时候调用progress方法,且writtenBytes=writtenBytes-34=86,然后调用remove方法,看下remove方法实现

        public boolean remove() {
            Entry e = flushedEntry;
            if (e == null) {// 如果为空了,代表全部写入,清空nioBufferCount和NIO_BUFFERS
                clearNioBuffers();
                return false;
            }
            Object msg = e.msg;
    
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
            removeEntry(e);
            //....
            return true;
        }
        private void removeEntry(Entry e) {
            // flushed在addFlush的时候会增加,在remove的时候会减少
            if (-- flushed == 0) {
                // 所有Entry的ByteBuf都写入完毕,置空flushedEntry tailEntry和unflushedEntry
                flushedEntry = null;
                if (e == tailEntry) {
                    tailEntry = null;
                    unflushedEntry = null;
                }
            } else {// 还有Entry没有处理完毕,当前Entry已经写入完毕,flushedEntry往后移动
                flushedEntry = e.next;
            }
        }
    

    经过第一个循环后缓冲区结构如下:


    image.png

    第二个循环->和第一个循环类似,处理完第二个后结构如下:


    image.png

    第三个循环-> 可以看到此时满足第二种情况了,这种情况ByteBuf只写了3个字节的大小,而当前ByteBuf大小而10,所以和第一个情况不同的是,这个将readerIndex设置为3

    整个write和flush的流程就分析完毕了,总结一下:

    1. write和flush经过pipeline最终都会到达HeadContext,HeadContext依赖unsafe去进行底层操作
    2. write只是将数据放到Netty的缓冲区ChannelOutboundBuffer中,内部实现是一个链表
      3.flush的时候,先判断当前是否监听了OP_WRITE事件,如果监听了则不进行flush操作,由selector异步轮询到OP_WRITE事件的时候调用foreceFlush进行flush
    3. 当SocketChannel.write返回的字节数小于等于0的时候代表当前通过不可写,则监听OP_WRITE事件,由selector触发flush操作
    4. 由于没法一次性将所有数据写入,Netty使用了类似自旋锁的操作,重复一定次数进行SocketChannel.write操作,如果最后还是没能全部将数据写入,则监听OP_WRITE事件
    5. 当某一个SocketChannel.write后,Netty遍历通过缓冲区中的Entry,将已经完全处理完毕的Entry移除,flushedEntry不停向后移动

    相关文章

      网友评论

          本文标题:Netty源码分析----writeAndFlush

          本文链接:https://www.haomeiwen.com/subject/lcdhuftx.html