美文网首页深入浅出Netty源码剖析
Netty源码-Write、Flush实现

Netty源码-Write、Flush实现

作者: persisting_ | 来源:发表于2019-05-28 23:29 被阅读2次

    1 概述

    在Netty中,发送报文和读取报文都是通过Unsafe处理的,但是说到底发送和读取报文都是从java.nio.channels.SelectableChannel读取或者向其写报文,而Netty自定义Channel是对java.nio.channels.SelectableChannel的封装增强,所以Unsafe在发送和读取报文最终调用的还是Channel相关的方法。

    我们在Handler中处理报文向Channel或者AbstractChannelHandlerContext写数据时(关于HandlerAbstractChannelHandlerContext可以参考笔者文章Netty源码-ChannelPipeline和ChannelHandler),其实并没有实际向SelectableChannel中写,也就没有实际写入socket发送缓冲区,而是写入Unsafe中的缓存ChannelOutboundBuffer中,只有在调用flush方法或者注册了OP_WRITE事件后才会实际向socket缓冲中进行写入操作,这里要注意在NIO编程时一般不会向Channel注册OP_WRITE事件,Netty也一样,只会在因网络缓存满写半包之后剩下数据要及时写出去,才会注册OP_WRITE事件,让Channel在准备好可以写之后告诉自己继续写剩下的数据(关于OP_WRITE事件的理解可以参考笔者文章NIO SelectionKey事件理解)。

    writeflush属于Outbound事件,所在最终调用pipeline中head节点(也就是HeadContext)的writeflush方法,HeadContextwriteflush方法如下(有关Outbound事件HeadContext可以参考笔者文章Netty源码-ChannelPipeline和ChannelHandler):

    //HeadContext
     @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }
    
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }
    

    可见最终调用的还是Unsafewriteflush方法,所以前面的调用我们不再赘述,后面直接介绍Unsafewriteflush方法。

    2 相关类介绍

    这里我们主要介绍Unsafe中的缓存ChannelOutboundBuffer类定义,ChannelOutboundBuffer使用单链表组织待写出的数据,链表节点为其内部类Entry,这里有一个知识点Entry使用了Netty的对象池技术(可参考笔者文章Netty源码-对象池Recycler),下面首先看ChannelOutboundBuffer的重要域:

    //ChannelOutboundBuffer
    //ChannelOutboundBuffer采用单链表组织要写出的ByteBuf,
    //tailEntry用于指向最后一个加入到链表的节点
    // The Entry which represents the tail of the buffer
    private Entry tailEntry;
    //每次调用addMessage方法向OutboundBuffer追加节点时
    //如果unflushedEntry为空则会被置为最后追加的节点
    //addMessage是需要判断unflushedEntry为空才置为最后追加的节点,
    //而unflushedEntry会在flush方法调用后会置为空
    //所以unflushedEntry不是始终指向尾节点,准确的说应该是指向最靠前
    //的一个未flushed节点
    // The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;
    //每次调用flush方法时,都会尝试将当前所有节点写入socket缓存中,
    //缓冲满时,则会注册OP_WRITE方法,在channel就绪时继续写,
    //但是不管能够一次性写出去,调用flush方法都会将从头结点开始
    //到unflushedEntry的所有节点表示为flush状态,flushedEntry置为
    //最后未写出的unflushedEntry节点,unflushedEntry则置为空
    // The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;
    //每次调用flush方法时都可能会将多个节点尝试写入socket
    //flushed则记录最后一个调用flush累计打算写的节点个数
    // The number of flushed entries that are not written yet
    private int flushed;
    

    看完重要域之后,下面再看重要的方法:

    //ChannelOutboundBuffer
    /**
    * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
    * the message was written.
    */
    //向链表中添加节点
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        //新建一个Entry节点实例
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        //如果tailEntry为空,则表示是第一个节点
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            //否则将新节点追加的链表最后,并更新tailEntry为新的尾节点
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        //如果unflushedEntry为空,则更新其为最新的尾节点
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
    
        //累积所有缓冲未写出的字节数
        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    
    //增加总的待写字节数
    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }
    
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        //如果待写字节数大于配置的WRITE_BUFFER_WATER_MARK
        //则设置为不可写状态
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    
    //采用CAS设置不可写状态,在每次实际写入socket之后,在
    //方法decrementPendingOutboundBytes中如果总字节数
    //小于设置的水位,则会取消不可写状态为可写状态
    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
                    //设置成功,则触发handler的channelWritabilityChanged
                    //方法,应用则有机会调节写入速度
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }
    
    /**
    * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
    * and so you will be able to handle them.
    */
    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        //判断unflushedEntry是否为空,放置节点个数没有发生变化,
        //但是多次调用addFlush方法
        if (entry != null) {
            //flushedEntry表示当前没有刷新任务,将flushedEntry节点
            //赋值为entry
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                //递增待刷新节点个数
                flushed ++;
                //设置该节点对应的promise为不可取消的
                //这里使用AtomicReferenceFieldUpdater更新
                //如果更新失败,表示已经取消了
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    //如果设置失败,则表示在刷新之前已经取消对该节点的
                    //写操作,所以这里进行取消,然后从所有待写入字节数
                    //中减去该节点字节数
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                //获取链表下一个元素
                entry = entry.next;
            } while (entry != null);
    
            // All flushed so reset unflushedEntry
            //因为所有节点都会被flush,所以设置unflushedEntry为空
            unflushedEntry = null;
        }
    }
    

    通过上面的ChannelOutboundBuffer.addFlush方法实现可知,在写入数据放入ChannelOutboundBuffer缓存之后如果想取消发送,则必须在addFlush方法调用之前取消,否则该消息会被设置为不可取消状态。

    3 Unsafe.write

    如上面的介绍Unsafe.write没有直接向socket缓冲写入数据,而是将数据写入Unsafe中的缓存ChannelOutboundBuffer,直接看AbstractUnsafe.write源码:

    //AbstractUnsafe
    //ChannelOutboundBuffer是AbstractUnsafe的一个域,在声明时初始化如下
    private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
    
    //写方法
    @Override
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        //outboundBuffer为空,表示初始化失败,这里抛异常
        if (outboundBuffer == null) {
            // If the outboundBuffer is null we know the channel was closed and so
            // need to fail the future right away. If it is not null the handling of the rest
            // will be done in flush0()
            // See https://github.com/netty/netty/issues/2362
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
            return;
        }
    
        int size;
        try {
            //过滤写出的报文,默认实现实际没有做什么操作,不展开介绍
            msg = filterOutboundMessage(msg);
            //得出报文的大小,这个后面介绍
            size = pipeline.estimatorHandle().size(msg);
            //如果小于0的话,赋值为0
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
    
        //将消息添加到缓冲中
        outboundBuffer.addMessage(msg, size, promise);
    }
    
    

    上面pipeline.estimatorHandle().size(msg)默认实现如下源码所示:

    //DefaultMessageSizeEstimator.HandleImpl
    private static final class HandleImpl implements Handle {
        private final int unknownSize;
    
        private HandleImpl(int unknownSize) {
            this.unknownSize = unknownSize;
        }
    
        //size方法返回的就是ByteBuf的可读字节数
        @Override
        public int size(Object msg) {
            if (msg instanceof ByteBuf) {
                return ((ByteBuf) msg).readableBytes();
            }
            if (msg instanceof ByteBufHolder) {
                return ((ByteBufHolder) msg).content().readableBytes();
            }
            if (msg instanceof FileRegion) {
                return 0;
            }
            return unknownSize;
        }
    }
    

    4 Unsafe.flush

    Unsafe.flushAbstractUnsafe中实现如下:

    //AbstractUnsafe
    @Override
    public final void flush() {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
        //这个会将outboundBuffer所有待写入节点修改为flushing状态
        //具体实现在上面第二节已经介绍过
        outboundBuffer.addFlush();
    
        flush0();
    }
    
    @SuppressWarnings("deprecation")
    protected void flush0() {
        //inFlush0在进入时被置为true,结束一次flush
        //后变为false,这里判断为true直接返回,避免
        //正在刷新中又触发调用
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }
    
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }
    
        inFlush0 = true;
        //AbstractUnsafe是AbstractChannel的内部类,这个方法是
        //AbstractChannel的方法,保证刷新时通道还是有效状态
        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                //同样判断通道状态是否为打开状态
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }
    
        try {
            //进行向通道的实际写操作,这里的doWrite也是Channel的方法
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            //一些异常处理
            if (t instanceof IOException && config().isAutoClose()) {
                /**
                    * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                    * failing all flushed messages and also ensure the actual close of the underlying transport
                    * will happen before the promises are notified.
                    *
                    * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                    * may still return {@code true} even if the channel should be closed as result of the exception.
                    */
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }
    

    AbstractUnsafe.flush0方法中调用的doWrite其实是外部类AbstractChannel定义的方法,我们直接看其子类AbstractNioByteChannel中的实现:

    //AbstractNioByteChannel
     @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //可以看下getWriteSpinCount方法的注释
        //因为每次写入可能会因为socket缓冲满而写不了数据
        //导致写入字节数为0,writeSpinCount就是控制如果
        //写入字节数为0,尝试进行多少次写入
        //也就是注释所说的:向channel写入返回直到返回非0值
        //尝试次数,默认为16,可用使用WRITE_SPIN_COUNT进行配置
    
        //上面的getWriteSpinCount方法注释的含义,但是看代码,每次写成功
        //一些数据(写入字节数大于0)节点会递减writeSpinCount的值
        //所以writeSpinCount也表示一次flush操作尝试写的次数
        int writeSpinCount = config().getWriteSpinCount();
        do {
            //获取outboundBuffer当前的头结点,进行写入
            Object msg = in.current();
            //如果返回为空,表示outboundBuffer所有节点
            //的数据已经全部发送
            if (msg == null) {
                //所有数据都已发送,所以取消OP_WRITE事件的注册,
                //避免select因write事件返回,但是程序实际上无
                //数据要写
                // Wrote all messages.
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }
            //这里进行实际写操作,如果当前节点in.current写成功
            //会从outboundBuffer中删除该节点,并将in.current
            //置为后面一个节点
            writeSpinCount -= doWriteInternal(in, msg);
            //while如果writeSpinCount小于0,则不再进行写入
        } while (writeSpinCount > 0);
    
        //如果writeSpinCount<0,则表示没有写入完成,还有数据未写,
        //所以会注册OP_WRITE,让channel在可写时主动告诉程序,
        //应用会再次进行写尝试
        incompleteWrite(writeSpinCount < 0);
    }
    
    private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
        //如果消息类型为ByteBuf,进入如下分支
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            //该ByteBuf已经没有数据可读,直接返回
            //没有进行实际的写操作,所以writeSpinCount不递减
            if (!buf.isReadable()) {
                in.remove();
                return 0;
            }
            //进行实际的写操作,这里我们后面会列一下NioSocketChannel
            //的实现
            final int localFlushedAmount = doWriteBytes(buf);
            //如果返回值大于0,表示已经写出一部分字节
            if (localFlushedAmount > 0) {
                //更新写出进度
                in.progress(localFlushedAmount);
                //如果该ByteBuf没有可读数据,表示该ByteBuf已经全部写完
                //把它从outboundBuffer中移除,
            //这会将outboundBuffer.current节点指向当前节点后面一个节点
                if (!buf.isReadable()) {
                    in.remove();
                }
                //进行了实际写操作,所以writeSpinCount递减一
                return 1;
            }
        } else if (msg instanceof FileRegion) {
            //和写ByteBuf一样的逻辑,不再介绍
            FileRegion region = (FileRegion) msg;
            if (region.transferred() >= region.count()) {
                in.remove();
                return 0;
            }
    
            long localFlushedAmount = doWriteFileRegion(region);
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount);
                if (region.transferred() >= region.count()) {
                    in.remove();
                }
                return 1;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
        //到这里表示实际写操作返回值小于等于0,可能socket缓冲区已满
        //所以这里返回一个特别大的数(2147483647),令doWrite直接
        //跳出while循环
        return WRITE_STATUS_SNDBUF_FULL;
    }
    
    //doWrite中如果writeSpinCount<0,则表示没有写入完成,还有数据未写,
    //这里传入true
    protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        //如果传入ture,表示上次写入没有完成,还有数据待写,所以注册
        //OP_WRITE事件,让通道在准备好写入时告诉自己进行写入
        if (setOpWrite) {
            setOpWrite();
        } else {
            // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
            // use our write quantum. In this case we no longer want to set the write OP because the socket is still
            // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
            // and set the write OP if necessary.
            //表示数据写入完毕,所以取消对OP_WRITE的注册
            clearOpWrite();
    
            // Schedule flush again later so other tasks can be picked up in the meantime
            //向channel加入一个flushTask,该任务会调用unsafe.flush0方法
            eventLoop().execute(flushTask);
        }
    }
    //注册OP_WRITE事件
    protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }
    //取消注册的OP_WRITE事件
    protected final void clearOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
            key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
        }
    }
    
    private final Runnable flushTask = new Runnable() {
        @Override
        public void run() {
            // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
            // meantime.
            ((AbstractNioUnsafe) unsafe()).flush0();
        }
    };
    

    下面AbstractNioByteChannel.doWriteBytes(ByteBuf buf)我们看下NioSocketChannel.doWriteBytes(ByteBuf buf)的实现:

    //NioSocketChannel
    //可见就是向java.nio.channels.SocketChannel写数据
    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }
    

    如果向通道注册了OP_WRITE事件可以在NioEventLoop.run方法select返回后,在processSelectedKey处理返回的SelectionKey的代码中会调用Unsafe.forceFlush

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        ...
    
        try {
            ...
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                //forceFlush最终会调用Unsafe.flush0方法
                ch.unsafe().forceFlush();
            }
    
            ...
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    最后总结一下,我们在向通道进行写入时,调用write方法只是将待写入的数据放入ChannelOutboundBuffer缓冲中,实际向socket进行写出有两个地方触发,第一个就是调用Unsafe.flush方法;另一个就是发生了部分写,注册OP_WRITE之后,Selector.select返回写就绪,也会触发实际socket写操作。

    相关文章

      网友评论

        本文标题:Netty源码-Write、Flush实现

        本文链接:https://www.haomeiwen.com/subject/zdrltctx.html