美文网首页java 成神之路
Netty 之 ChannelOutboundBuffer 源码

Netty 之 ChannelOutboundBuffer 源码

作者: jijs | 来源:发表于2019-01-31 08:55 被阅读89次

    每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。

    ChannelOutboundBuffer 类属性

    public final class ChannelOutboundBuffer {
    
        // Assuming a 64-bit JVM:
        //  - 16 bytes object header
        //  - 8 reference fields
        //  - 2 long fields
        //  - 2 int fields
        //  - 1 boolean field
        //  - padding
        static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
                SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
    
    
        private final Channel channel;
    
        // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
        //
        // 缓存链表中被刷新的第一个元素
        private Entry flushedEntry;
        // 缓存链表中中第一个未刷新的元素
        private Entry unflushedEntry;
        // 缓存链表中的尾元素
        private Entry tailEntry;
        // 刷新但还没有写入到 socket 中的数量
        private int flushed;
    

    从类的属性中可以看出 ChannelOutboundBuffer 定义了一个链表进行存储写入的数据。

    1、ChannelOutboundBuffer 添加了N个 Entry


    2、当 flush 后的效果


    3、flush 后把所有的数据都写入 Socket 中


    4、当再次写入 Entry 后


    下面针对操作这些操作进行分析,添加、刷新、删除等操作。

    addMessage() 添加操作

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
    
        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    

    1、创建一个 新的Entry。
    2、判断 tailEntry 是否为 null,如果为 null 说明链表为空。则把 flushedEntry 置为null。
    3、如果 tailEntry 不为空,则把新添加的 Entry 添加到 tailEntry 后面 。
    4、 将新添加的 Entry 设置为 链表的 tailEntry。
    5、如果 unflushedEntry 为null,说明没有未被刷新的元素。新添加的Entry 肯定是未被刷新的,则把当前 Entry 设置为 unflushedEntry 。
    6、统计未被刷新的元素的总大小。

    addFlush() 刷新操作

    当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将 unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,至于为什么这么做?
    答:因为 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。

    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);
    
            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
    

    1、通过 unflushedEntry 获取未被刷新元素 entry。
    2、如果 entry 为null 说明没有待刷新的元素,不执行任何操作
    3、如果 entry 不为 null,说明有需要被刷新的元素
    4、如果 flushedEntry == null 说明当前没有正在刷新的任务,则把 entry 设置为 flushedEntry 刷新的起点。
    5、循环设置 entry, 设置这些 entry 状态设置为非取消状态,如果设置失败,则把这些entry 节点取消并使 totalPendingSize 减去这个节点的字节大小。

    在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。代码参见 AbstractUnsafe.flush() 方法。

    remove() 删除操作

    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;
    
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
    
        removeEntry(e);
    
        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }
    
        // recycle the entry
        e.recycle();
    
        return true;
    }
    

    1、获取 flushedEntry 节点,链表的头结点。如果获取不到清空 ByteBuf 缓存。
    2、在链表上移除该 Entry。如果之前没有取消,只释放消息、通知和递减。
    3、回收 Entry 对象。

    removeEntry() 链表移除节点

    private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            flushedEntry = e.next;
        }
    }
    

    1、如果 flushed ==0 说明,链表中所有 flush 的数据都已经发送到 Socket 中。把 flushedEntry 置位 null。此时链表可能还有 unflushedEntry 数据。
    如果此时 e == tailEntry 说明链表为空,则把 tailEntry 和 unflushedEntry 都置为空。

    flush==0,可能是这种状态,如下图:


    2、把 flushedEntry 置为下一个节点(flushedEntry 此时是头结点)。

    Entry 对象池使用

    由于 Entry 使用比较频繁,会频繁的创建和销毁,这里使用了 Entry 的对象池,创建的时候从缓存中获取,销毁时回收。

    下面看下 Entry 的创建过程

    Entry 创建

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        ..... 
    

    使用 Entry.newInstance 方法进行创建 Entry 对象。

    static final class Entry {
        private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
            @Override
            protected Entry newObject(Handle<Entry> handle) {
                return new Entry(handle);
            }
        };
    
        static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
            Entry entry = RECYCLER.get();
            entry.msg = msg;
            entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
            entry.total = total;
            entry.promise = promise;
            return entry;
        }
    

    Entry.newInstance() 方法收下从 Recycler 中获取 Entry 对象,如果获取不到则使用 Recycler.newObject() 方法创建一个 Entry 对象,调用 newObject() 方法在 Recycler.get() 方法中,代码如下:

    public abstract class Recycler<T> {
        public final T get() {
            if (maxCapacityPerThread == 0) {
                return newObject((Handle<T>) NOOP_HANDLE);
            }
            Stack<T> stack = threadLocal.get();
            DefaultHandle<T> handle = stack.pop();
            if (handle == null) {
                handle = stack.newHandle();
                handle.value = newObject(handle);
            }
            return (T) handle.value;
        }
    

    从 Recycler 中可以看出,Entry 对象是存储在 Stack 中。如果 Stack 中没有可用的 Stack,则调用 newObject() 方法创建。

    Entry 对象回收

    public boolean remove() {
        Entry e = flushedEntry;
        ...
        e.recycle();
    
        return true;
    }
    

    当 Entry 从链表中移除的时候回调用 e.recycle() 方法。

    static final class Entry {
        void recycle() {
            next = null;
            bufs = null;
            buf = null;
            msg = null;
            promise = null;
            progress = 0;
            total = 0;
            pendingSize = 0;
            count = -1;
            cancelled = false;
            handle.recycle(this);
        }
    

    Entry.recycle() 方法会把 Entry 中成员变量全部初始化。然后在调用 handle.recycle() 方法。

    public abstract class Recycler<T> {
        static final class DefaultHandle<T> implements Handle<T> {
            @Override
            public void recycle(Object object) {
                if (object != value) {
                    throw new IllegalArgumentException("object does not belong to handle");
                }
    
                Stack<?> stack = this.stack;
                if (lastRecycledId != recycleId || stack == null) {
                    throw new IllegalStateException("recycled already");
                }
    
                stack.push(this);
            }
        }
    

    Recycler.recycle() 方法又把 Entry 对象压进 stack 中。

    相关文章

      网友评论

        本文标题:Netty 之 ChannelOutboundBuffer 源码

        本文链接:https://www.haomeiwen.com/subject/swjzjqtx.html