美文网首页NettyJava学习笔记程序员
自顶向下深入分析Netty(九)--UnpooledByteBu

自顶向下深入分析Netty(九)--UnpooledByteBu

作者: Hypercube | 来源:发表于2017-05-25 23:27 被阅读2388次

    前文分析了ByteBuf的抽象类实现框架,现在开始分析最底层的实现类。分为两种情形:Unpooled和Pooled,首先看Unpooled。

    1.UnpooledHeapByteBuf

    该Bytebuf的底层为不使用对象池技术的JAVA堆字节数组,首先看其中的成员变量:

        private final ByteBufAllocator alloc;   // 分配器
        byte[] array;   // 底层字节数组
        private ByteBuffer tmpNioBuf; // NIO的ByteBuffer形式
    

    只需要着重关注array变量,它是位于JAVA堆的字节数组。
    再看一个构造方法(忽略其中的参数检查):

        protected UnpooledHeapByteBuf(ByteBufAllocator alloc, 
                                        int initialCapacity, int maxCapacity) {
            super(maxCapacity);
    
            this.alloc = alloc;
            setArray(allocateArray(initialCapacity));
            setIndex(0, 0);
        }
        
        private void setArray(byte[] initialArray) {
            array = initialArray;
            tmpNioBuf = null;
        }
        
        byte[] allocateArray(int initialCapacity) {
            return new byte[initialCapacity];
        }
    

    实现也很简单,只需关注allocateArray()方法,分配一个数组;对应地,有一个freeArray()方法,释放一个数组,代码如下:、

        void freeArray(byte[] array) {
            // NOOP 
        }
    

    由于堆内的字节数组会被GC自动回收,所以不需要具体实现代码。此外,在引用计数的分析中,当引用计数释放的时候需要调用deallocate()方法释放该ByteBuf,实现如下:

        protected void deallocate() {
            freeArray(array);
            array = null;
        }
    

    同理,使用GC自动回收,而设置array=null可以帮助GC回收。
    ByteBuf中有关于判断底层实现的方法,具体实现也很简单:

        // 默认的字节序:大端模式
        public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
        
        // 底层是否有JAVA堆字节数组
        public boolean hasArray() { return true; }
        
        // 底层数组的偏移量
        public int arrayOffset() { return 0; }
    
        // 是否直接数组
        public boolean isDirect() { return false; }
        
        // 是否含有os底层的数组起始地址
        public boolean hasMemoryAddress() { return false; }
    

    接下来,看重要的设置容量方法capacity(int newCapacity)

        public ByteBuf capacity(int newCapacity) {
            int oldCapacity = array.length;
            byte[] oldArray = array;
            if (newCapacity > oldCapacity) {    // 容量扩增
                byte[] newArray = allocateArray(newCapacity); // 申请数组
                // 将老数组的字节复制到新数组
                System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
                setArray(newArray);
                freeArray(oldArray);
            } else if (newCapacity < oldCapacity) { // 容量缩减
                byte[] newArray = allocateArray(newCapacity);
                int readerIndex = readerIndex();
                // 容量缩减导致读写索引改变
                if (readerIndex < newCapacity) {
                    int writerIndex = writerIndex();
                    if (writerIndex > newCapacity) {
                        writerIndex(writerIndex = newCapacity);
                    }
                    // 只拷贝读索引之后的数据,读索引之前0填充
                    System.arraycopy(oldArray, readerIndex, 
                                     newArray, readerIndex, writerIndex - readerIndex);
                } else {
                    setIndex(newCapacity, newCapacity);
                }
                setArray(newArray);
                freeArray(oldArray);
            }
            // 容量相等时不做处理
            return this;
        }
    

    设置容量分为两种情况:容量扩增和容量缩减。实现都是将老数据复制到新的字节数组中,有必要的话,调整读写索引位置。
    之前分析过getXXX()readXXX()的核心实现是_getXXX(index)方法,以_getInt(index)为例进行分析,代码如下:

        protected int _getInt(int index) {
            return HeapByteBufUtil.getInt(array, index);
        }
        
        static int getInt(byte[] memory, int index) {
            return  (memory[index]     & 0xff) << 24 |
                    (memory[index + 1] & 0xff) << 16 |
                    (memory[index + 2] & 0xff) <<  8 |
                    memory[index + 3] & 0xff;
        }
    

    将字节数组中指定索引位置处的4个字节按照大端模式通过移位组装为一个整数。同理,可推断_setInt(index)方法将一个整数的4个字节通过移位填充到字节数组的指定位置,确实如此,核心实现如下:

        static void setInt(byte[] memory, int index, int value) {
            memory[index]     = (byte) (value >>> 24);
            memory[index + 1] = (byte) (value >>> 16);
            memory[index + 2] = (byte) (value >>> 8);
            memory[index + 3] = (byte) value;
        }
    

    可以派生新的ByteBuf的方法中,slice()duplicate()共享底层实现,在本类中,就是共享array变量,但各自维护独立索引,而copy()方法有自己独立的底层字节数组,通过将数据复制到一个新的字节数组实现,代码如下:

        public ByteBuf copy(int index, int length) {
            checkIndex(index, length);
            byte[] copiedArray = new byte[length];
            System.arraycopy(array, index, copiedArray, 0, length);
            return new UnpooledHeapByteBuf(alloc(), copiedArray, maxCapacity());
        }
    

    虽然JDK自带的ByteBuffer有各种缺憾,但在进行IO时,不得不使用原生的ByteBuffer,所以Netty的ByteBuf也提供方法转化,实现如下:

        public ByteBuffer internalNioBuffer(int index, int length) {
            checkIndex(index, length);
            return (ByteBuffer) internalNioBuffer().clear()
                                    .position(index).limit(index + length);
        }
        
        private ByteBuffer internalNioBuffer() {
            ByteBuffer tmpNioBuf = this.tmpNioBuf;
            if (tmpNioBuf == null) {
                this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
            }
            return tmpNioBuf;
        }
    

    方法将该类转化为JDK的HeapByteBuffer,可见也是一个堆缓冲区。clear().position(index).limit(index + length)的使用是防止原生ByteBuffer的读写模式切换造成的错误。

    至此,UnpooledHeapByteBuf的实现分析完毕,可见并没有想象中的困难,再接再厉,分析UnpooledDirectByteBuf

    2. UnpooledDirectByteBuf

    Netty的UnpooledDirectByteBuf在NIO的DirectByteBuf上采用组合的方式进行了封装,屏蔽了对程序员不友好的地方,并使其符合Netty的ByteBuf体系。使用与UnpooledHeapByteBuf相同的顺序进行分析,首先看成员变量:

        private final ByteBufAllocator alloc;   // 分配器
    
        private ByteBuffer buffer;  // 底层NIO直接ByteBuffer
        private ByteBuffer tmpNioBuf; // 用于IO操作的ByteBuffer
        private int capacity; // ByteBuf的容量
        private boolean doNotFree; // 释放标记
    

    做一个简介,buffer表示底层的直接ByteBuffer;tmpNioBuf常用来进行IO操作,实现实质是buffer.duplicate()即与buffer共享底层数据结构;capacity表示缓冲区容量,即字节数;doNotFree是一个标记,表示是否需要释放buffer的底层内存。

    接着分析构造方法:

        protected UnpooledDirectByteBuf(ByteBufAllocator alloc, 
                                    int initialCapacity, int maxCapacity) {
            super(maxCapacity);
    
            this.alloc = alloc;
            setByteBuffer(allocateDirect(initialCapacity));
        }
        
        protected ByteBuffer allocateDirect(int initialCapacity) {
            return ByteBuffer.allocateDirect(initialCapacity);
        }
        
        private void setByteBuffer(ByteBuffer buffer) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    doNotFree = false;
                } else {
                    freeDirect(oldBuffer);
                }
            }
    
            this.buffer = buffer;
            tmpNioBuf = null;
            capacity = buffer.remaining();
        }
    

    由于setByteBuffer(buffer)中含有doNotFree变量使得理解稍微困难.仔细分析,当doNotFree为true时,调用后置为false,而为false时都需要freeDirect(oldBuffer)。由此可知,doNotFree表示不需要释放旧的Buffer,根据代码大全,使用反义Not并不是好的做法,使用free表示是否需要释放旧的Buffer会更容易让人理解。另外从代码可以看出:不需要释放旧的Buffer只有一种情况,这种情况便是Buffer作为构造方法的参数时,代码如下:

        protected UnpooledDirectByteBuf(ByteBufAllocator alloc, 
                                          ByteBuffer initialBuffer, int maxCapacity) {
            super(maxCapacity);
            int initialCapacity = initialBuffer.remaining();
    
            this.alloc = alloc;
            doNotFree = true;   // 置为true 表示不需要释放原有buffer
            setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
            // 此时 doNotFree已经为false
            writerIndex(initialCapacity);
        }
    

    分析完,发现doNotFree是一个不必要的变量,除非在执行构造方法的时候,oldBuffer不为null。(目前没想到有什么情况如此)
    使用allocateDirect(initialCapacity)分配内存时实际委托给NIO的方法,释放内存freeDirect(buffer)也如此,委托给了NIO中DirectByteBuffer的cleaner,代码如下:

        protected void freeDirect(ByteBuffer buffer) {
            PlatformDependent.freeDirectBuffer(buffer);
        }
        
        public void freeDirectBuffer(ByteBuffer buffer) {
            if (!buffer.isDirect()) {
                return;
            }
            try {
                Object cleaner = PlatformDependent0.getObject(buffer, CLEANER_FIELD_OFFSET);
                if (cleaner != null) {
                    CLEAN_METHOD.invoke(cleaner);
                }
            } catch (Throwable cause) {
                PlatformDependent0.throwException(cause);
            }
        }
    

    实际代码根据JDK版本不同调用不同方法,上述只是其中之一,但原理相同,不再列出。
    与引用计数相关的deallocate()方法,代码实现如下:

        protected void deallocate() {
            ByteBuffer buffer = this.buffer;
            if (buffer == null) {
                return;
            }
    
            this.buffer = null;
    
            if (!doNotFree) { 
                freeDirect(buffer); // 前述分析可知,doNotFree构造方法之后一直为false
            }
        }
    

    判断底层实现的方法则如下:

        // 默认的字节序:大端模式
        public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
        
        // 是否直接数组
        public boolean isDirect() { return true; }
        
        // 底层是否有JAVA堆字节数组
        public boolean hasArray() { throw new UnsupportedOperationException("..."); }
        
        // 底层数组的偏移量
        public int arrayOffset() { throw new UnsupportedOperationException("..."); }
    
        // 是否含有os底层的数组起始地址
        public boolean hasMemoryAddress() { return false; }
    

    设置容量的方法:

        public ByteBuf capacity(int newCapacity) {
            checkNewCapacity(newCapacity);
    
            int readerIndex = readerIndex();
            int writerIndex = writerIndex();
    
            int oldCapacity = capacity;
            if (newCapacity > oldCapacity) {    // 容量扩增
                ByteBuffer oldBuffer = buffer;
                ByteBuffer newBuffer = allocateDirect(newCapacity);
                oldBuffer.position(0).limit(oldBuffer.capacity());
                newBuffer.position(0).limit(oldBuffer.capacity());
                newBuffer.put(oldBuffer);
                newBuffer.clear();
                setByteBuffer(newBuffer);
            } else if (newCapacity < oldCapacity) {  // 容量缩减
                ByteBuffer oldBuffer = buffer;
                ByteBuffer newBuffer = allocateDirect(newCapacity);
                if (readerIndex < newCapacity) {
                    if (writerIndex > newCapacity) {
                        writerIndex(writerIndex = newCapacity);
                    }
                    oldBuffer.position(readerIndex).limit(writerIndex);
                    newBuffer.position(readerIndex).limit(writerIndex);
                    newBuffer.put(oldBuffer);
                    newBuffer.clear();
                } else {
                    setIndex(newCapacity, newCapacity);
                }
                setByteBuffer(newBuffer);
            }
            return this;
        }
    

    与HeapByteBuf类似,容量改变时,都将oldBuffer中的数据复制到新的newBuffer中,只是在容量缩减时,需要调整读写索引。
    接着看关键的_getInt(index)_setInt(index,value)方法:

        protected int _getInt(int index) {
            return buffer.getInt(index);
        }
        
        protected void _setInt(int index, int value) {
            buffer.putInt(index, value);
        }
    

    可见具体实现委托给了NIO原生的ByteBuffer,追踪其中的具体实现,一种情况下的实现如下:

        
        static int getIntB(long a) {
            return makeInt(_get(a    ),
                           _get(a + 1),
                           _get(a + 2),
                           _get(a + 3));
        }
        
        static private int makeInt(byte b3, byte b2, byte b1, byte b0) {
            return (((b3       ) << 24) |
                    ((b2 & 0xff) << 16) |
                    ((b1 & 0xff) <<  8) |
                    ((b0 & 0xff)      ));
        }
    

    可见与Netty的HeapByteBuf实现一致。另一种情况是native实现,没有找到具体实现代码,如果你有兴趣可以寻找相关实现,有相关发现请告诉我。
    继续看copy()方法:

        public ByteBuf copy(int index, int length) {
            ensureAccessible();
            ByteBuffer src;
            try {
                src = (ByteBuffer) buffer.duplicate()
                            .clear().position(index).limit(index + length);
            } catch (IllegalArgumentException ignored) {
                throw new IndexOutOfBoundsException(
                        "Too many bytes to read - Need " + (index + length));
            }
    
            return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
        }
    

    对原buffer使用duplicate()方法,从而不干扰原来buffer的索引。然后从分配器中申请一个buffer并写入原buffer的数据。
    最后看internalNioBuffer()

        public ByteBuffer internalNioBuffer(int index, int length) {
            checkIndex(index, length);
            return (ByteBuffer) internalNioBuffer()
                        .clear().position(index).limit(index + length);
        }
    
        private ByteBuffer internalNioBuffer() {
            ByteBuffer tmpNioBuf = this.tmpNioBuf;
            if (tmpNioBuf == null) {
                this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
            }
            return tmpNioBuf;
        }
    

    可见,与copy()相同,使用duplicate()防止干扰原buffer的索引。
    至此,UnpooledDirectByteBuf的源码分析完毕。

    3. UnsafeByteBuf

    Netty还使用JAVA的后门类sun.misc.Unsafe实现了两个缓冲区UnpooledUnsafeHeapByteBufUnpooledUnsafeDirectByteBuf。这个强大的后门类Unsafe可以暴露出对象的底层地址,一般不建议使用,而性能优化狂魔Netty则顾不得这些。简单介绍一下这两个类的原理,不再对代码进行分析。UnpooledUnsafeHeapByteBuf在使用Unsafe后,暴露出字节数组在JAVA堆中的地址,所以不再使用字节数组的索引即array[index]访问,转而使用baseAddress + Index的得到字节的地址,然后从该地址取得字节。UnpooledUnsafeDirectByteBuf也一样,暴露底层DirectByteBuffer的地址后,使用相同的Address + Index方式取得对应字节。

    相关文章

      网友评论

        本文标题:自顶向下深入分析Netty(九)--UnpooledByteBu

        本文链接:https://www.haomeiwen.com/subject/orctfxtx.html