美文网首页
[Netty源码分析]ByteBuf(二)

[Netty源码分析]ByteBuf(二)

作者: 没意思先生1995 | 来源:发表于2018-08-29 23:27 被阅读0次
    1. ByteBufAllocator

    ByteBufAllocator是字节缓冲区分配器,根据Netty字节缓冲区的实现不同,分为两种不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他们提供了不同ByteBuf的分配方法

    ByteBufAllocator 继承体系.png

    API分类:

    1. buffer*前缀为分配普通 buffer;

    2. ioBuffer*前缀为分配适用于 I/O 操作的 buffer;

    3. directBuffer*前缀为分配直接内存 buffer;

    4. composite*前缀为分配合成 buffer;

    Q:heapBuffer和directBuffer就是分配堆内内存和堆外内存了,为什么还要上面的buffer方法?后面的AbstractByteBufAllocator就解释分配内存时先调用buffer方法,再在这个方法里面地调用heapBufffer和directBuffer

    ByteBufAllocator的buffer:

    /**
       * Allocate a {@link ByteBuf}. If it is a direct or heap buffer
       * depends on the actual implementation.
       */
      ByteBuf buffer();
    
      ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
      //AbstractByteBufAllocator
      @Override
      public ByteBuf buffer() {
          if (directByDefault) {
              return directBuffer();
          }
          return heapBuffer();
      }
    
      →→→→→ directBuffer具体实现
      @Override
      public ByteBuf directBuffer() {
          return directBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE);
      }
    
      ↓↓↓↓↓↓↓↓↓↓
      @Override
      public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
          if (initialCapacity == 0 && maxCapacity == 0) {
              return emptyBuf;
          }
          validate(initialCapacity, maxCapacity);
          return newDirectBuffer(initialCapacity, maxCapacity);
      }
    
      ↓↓↓↓↓↓↓↓↓↓
      /**
       * 依赖底层实现.
       */
      protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
    
      →→→→→ heapBuffer具体实现
      @Override
      public ByteBuf heapBuffer() {
          return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
      }
    
      ↓↓↓↓↓↓↓↓↓↓
      @Override
      public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
          if (initialCapacity == 0 && maxCapacity == 0) {
              return emptyBuf;
          }
          validate(initialCapacity, maxCapacity);
          return newHeapBuffer(initialCapacity, maxCapacity);
      }
    
      ↓↓↓↓↓↓↓↓↓↓↓
      /**
       * 依赖底层实现.
       */
      protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
    
    1. PooledByteBufAllocator和UnpooledByteBufAllocator
    1. Heap内存分配
        //UnpooledByteBufAllocator的newDirectBuffer和newHeapBuffer
        @Override
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            ByteBuf buf;
            //是否unsafe是由jdk底层去实现的,如果能够获取到unsafe对象,就使用unsafe
            if (PlatformDependent.hasUnsafe()) {
                buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
            return toLeakAwareBuffer(buf);
        }
    
        @Override
        protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
            //Netty5不区分安全与非安全
            return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
            //Netty4的实现如下:
            //return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
                    : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }
    
        ↓↓↓↓↓↓↓↓↓↓↓↓↓Netty5对UnpooledHeapByteBuf的实现
        public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(maxCapacity);
            checkNotNull(alloc, "alloc");
            if (initialCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
            }
            this.alloc = alloc;
            setArray(allocateArray(initialCapacity));
            setIndex(0, 0);
        }
    
        ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
        //AbstractReferenceCountedByteBuf
        protected AbstractReferenceCountedByteBuf(int maxCapacity) {
            super(maxCapacity);
            refCntUpdater.set(this, 1);
        }
    
        ↓↓↓↓↓↓↓↓↓↓↓↓↓Netty4对UnpooledUnsafeHeapByteBuf的实现
        //UnpooledUnsafeHeapByteBuf
        UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            super(alloc, initialCapacity, maxCapacity);
        }
    
        ↓↓↓↓↓↓↓↓↓↓↓↓↓
        //UnpooledHeapByteBuf
        protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
            this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
        }
    
        //再跳回原来的UnpooledHeapByteBuf
        private UnpooledHeapByteBuf(
                ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
              //略
        }
    
        ↓↓↓↓↓↓↓↓↓↓↓↓↓保存字节数据和指针
        private void setArray(byte[] initialArray) {
            array = initialArray;
            tmpNioBuf = null;
        }
    
        @Override
        public ByteBuf setIndex(int readerIndex, int writerIndex) {
            if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
                throw new IndexOutOfBoundsException(String.format(
                        "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
                        readerIndex, writerIndex, capacity()));
            }
            setIndex0(readerIndex, writerIndex);
            return this;
        }
    
        //安全和非安全看这里,Nett5和Netty4的newHeapBuffer不一样了,以下举例Netty4:
        //UnpooledHeapByteBuf
        @Override
        protected byte _getByte(int index) {
            return HeapByteBufUtil.getByte(array, index);
        }
    
        ↓↓↓↓↓↓↓↓↓
        //HeapByteBufUtil
        static byte getByte(byte[] memory, int index) {
            return memory[index];
        }
    
        //UnpooledUnsafeHeapByteBuf
        @Override
        protected byte _getByte(int index) {
            return UnsafeByteBufUtil.getByte(array, index);
        }
    
        ↓↓↓↓↓↓↓↓↓↓↓↓
        //UnsafeByteBufUtil
        static byte getByte(long address) {
            return PlatformDependent.getByte(address);
        }
    
    1. direct内存分配
    //UnpooledDirectByteBuf
    public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }
    
        this.alloc = alloc;
        setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
    }
    
    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    private void setByteBuffer(ByteBuffer buffer) {
          ByteBuffer oldBuffer = this.buffer;
          if (oldBuffer != null) {
              if (doNotFree) {
                  doNotFree = false;
              } else {
                  freeDirect(oldBuffer);
              }
          }
          this.buffer = buffer;
          tmpNioBuf = null;
          capacity = buffer.remaining();
      }
    
      →→→→→→JDK底层方法ByteBuffer.allocateDirect(initialCapacity)
      //ByteBuffer
      public static ByteBuffer allocateDirect(int capacity) {
          return new DirectByteBuffer(capacity);
      }
    
      //
      public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
          super(maxCapacity);
          if (alloc == null) {
              throw new NullPointerException("alloc");
          }
          if (initialCapacity < 0) {
              throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
          }
          if (maxCapacity < 0) {
              throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
          }
          if (initialCapacity > maxCapacity) {
              throw new IllegalArgumentException(String.format(
                      "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
          }
    
          this.alloc = alloc;
          //allocateDirect(initialCapacity)同调用JDK的API
          setByteBuffer(allocateDirect(initialCapacity), false);
      }
    
      ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
      //非安全的区别在于通过反射UnSafe算出内存地址保存memoryAddress
      final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
          if (tryFree) {
              ByteBuffer oldBuffer = this.buffer;
              if (oldBuffer != null) {
                  if (doNotFree) {
                      doNotFree = false;
                  } else {
                      freeDirect(oldBuffer);
                  }
              }
          }
          this.buffer = buffer;
          memoryAddress = PlatformDependent.directBufferAddress(buffer);
          tmpNioBuf = null;
          capacity = buffer.remaining();
      }
    
      →→→→→→→→→→→→
      //getByte的时候通过地址去找
      @Override
      protected byte _getByte(int index) {
          return UnsafeByteBufUtil.getByte(addr(index));
      }
    

    相关文章

      网友评论

          本文标题:[Netty源码分析]ByteBuf(二)

          本文链接:https://www.haomeiwen.com/subject/mculwftx.html