美文网首页
Java NIO实现原理之Buffer

Java NIO实现原理之Buffer

作者: Monica2333 | 来源:发表于2018-10-26 17:39 被阅读0次

    nio是基于事件驱动模型的非阻塞io,这篇文章简要介绍了nio,本篇主要介绍Buffer的实现原理。

    Buffer

    是一块缓冲区,通常使用buffer读写数据为:

    RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
    FileChannel inChannel = aFile.getChannel();
    
    //1.create buffer with capacity of 48 bytes
    ByteBuffer buf = ByteBuffer.allocate(48);
    //2.write into buffer
    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {
    //3.make buffer from write mode to read mode
      buf.flip();  
    
      while(buf.hasRemaining()){
        //4. read 1 byte from buffer
          System.out.print((char) buf.get()); 
      }
    //5.调用clear()方法或者compact()方法,make buffer ready for writing
      buf.clear(); 
      bytesRead = inChannel.read(buf);
    }
    aFile.close();
    

    Buffer的数据结构设计如下:

    Buffer数据结构.png
    其中:
    capacity:buffer的固定大小值
    position:在写模式下,表示当前写入数据的位置。在读模式下,表示当前已读到数据的位置
    limit:在写模式下,表示最大可写的位置,为capacity ,在读模式下,表示最大可读位置。
    此外,Buffer类中还有以下参数:
    mark:初始值为-1,用于备份当前的position。
    address:buffer对象持有的堆外内存(DirectByteBuffer)的内存地址,方便JNI 快速找到堆外内存地址。
    Buffer相关的类结构如下:
    Buffer类结构.png

    Buffer或ByteBuffer的方法简介:
    1.Buffer的分配:

    //分配一份堆内内存
    public static ByteBuffer allocate(int capacity) {
            if (capacity < 0)
                throw new IllegalArgumentException();
            return new HeapByteBuffer(capacity, capacity);
        }
    //分配一份堆外内存
    public static ByteBuffer allocateDirect(int capacity) {
            return new DirectByteBuffer(capacity);
        }
    

    2.向buffer写入数据:

    //各种put方法,也可从channel中读取,inChannel.read(buf)
    put(....)
    

    3.从buffer中读取数据

    //也可以这样写入channel。 int bytesWritten = inChannel.write(buf);
    get()
    

    4.flip():将Buffer从写模式切换到读模式

    public final Buffer flip() {
            limit = position;
            position = 0;
            mark = -1;
            return this;
        }
    

    5.rewind():将position设回0,所以你可以重读Buffer中的所有数据

    public final Buffer rewind() {
            position = 0;
            mark = -1;
            return this;
     }
    

    6.clear()与compact():一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成。
    compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

     public final Buffer clear() {
            position = 0;
            limit = capacity;
            mark = -1;
            return this;
        }
    

    7.mark()与reset()
    通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。方便数据的重新读取,而流只能是单向读取。

    ByteBuffer的两种实现:
    HeapByteBuffer:Java中分配的非空对象都是由Java虚拟机的垃圾收集器管理的,也称为堆内内存(on-heap memory)。虚拟机会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着一个重要的事实——这样一次垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。JVM参数中 -Xmx的值是新生代和老生代的和的最大值,我们在jvm参数里通常还会加一个参数-XX:MaxPermSize来指定持久代的最大值,那么我们认识的Java堆的最大值其实是-Xmx和-XX:MaxPermSize的总和,在分代算法下,新生代,老生代和持久代是连续的虚拟地址,因为它们是一起分配的。
    DirectByteBuffer:由该对象创建的在jvm之外的内存,对于生命期中等或较长的对象,正是堆外内存要解决的。堆外内存有以下特点:
    对于大内存有良好的伸缩性
    对垃圾回收停顿的改善可以明显感觉到
    在进程间可以共享,减少虚拟机间的复制
    接下来看一下DirectByteBuffer的实现:

     DirectByteBuffer(int cap) {                   // package-private
    
            super(-1, 0, cap, cap);
            boolean pa = VM.isDirectMemoryPageAligned();
            int ps = Bits.pageSize();
            long size = Math.max(1L, (long)cap + (pa ? ps : 0));
           //确认堆外内存是否够用
            Bits.reserveMemory(size, cap);
    
            long base = 0;
            try {
               //分配堆外内存,并返回堆外内存的基地址
                base = unsafe.allocateMemory(size);
            } catch (OutOfMemoryError x) {
                Bits.unreserveMemory(size, cap);
                throw x;
            }
            unsafe.setMemory(base, size, (byte) 0);
            if (pa && (base % ps != 0)) {
                // Round up to page boundary
                address = base + ps - (base & (ps - 1));
            } else {
                address = base;
            }
            cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
            att = null;
    
    
    
        }
    

    Bits.reserveMemory(size, cap) 方法,该方法用于在系统中保存总分配内存(按页分配)的大小和实际内存的大小。

     // These methods should be called whenever direct memory is allocated or
        // freed.  They allow the user to control the amount of direct memory
        // which a process may access.  All sizes are specified in bytes.
        static void reserveMemory(long size, int cap) {
    
            if (!memoryLimitSet && VM.isBooted()) {
                maxMemory = VM.maxDirectMemory();
                memoryLimitSet = true;
            }
    
            // optimist!内存够用!
            if (tryReserveMemory(size, cap)) {
                return;
            }
      
            final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();
    
            // retry while helping enqueue pending Reference objects
            // which includes executing pending Cleaner(s) which includes
            // Cleaner(s) that free direct buffer memory
            while (jlra.tryHandlePendingReference()) {
                if (tryReserveMemory(size, cap)) {
                    return;
                }
            }
    
            // trigger VM's Reference processing
            System.gc();
    
            // a retry loop with exponential back-off delays
            // (this gives VM some time to do it's job)
            boolean interrupted = false;
            try {
                long sleepTime = 1;
                int sleeps = 0;
                while (true) {
                    if (tryReserveMemory(size, cap)) {
                        return;
                    }
                    if (sleeps >= MAX_SLEEPS) {
                        break;
                    }
                    if (!jlra.tryHandlePendingReference()) {
                        try {
                            Thread.sleep(sleepTime);
                            sleepTime <<= 1;
                            sleeps++;
                        } catch (InterruptedException e) {
                            interrupted = true;
                        }
                    }
                }
    
                // no luck
                throw new OutOfMemoryError("Direct buffer memory");
    
            } finally {
                if (interrupted) {
                    // don't swallow interrupts
                    Thread.currentThread().interrupt();
                }
            }
        }
    

    如果堆内存不够分配的话,jlra.tryHandlePendingReference()将触发一次非阻塞的Reference#tryHandlePending(false),该方法会将已经被JVM垃圾回收的DirectBuffer对象的堆外内存释放。
    如果还是无法释放足够的内存,将会触发System.gc(),该方法会触发一个full gc,如果JVM参数没有设置-XX:+DisableExplicitGC。但是调用System.gc()并不能够保证full gc马上就能被执行。所以在后面打代码中,会进行最多9次尝试,看是否有足够的可用堆外内存来分配堆外内存。并且每次尝试之前,都对延迟等待时间,已给JVM足够的时间去完成full gc操作。如果9次尝试后依旧没有足够的可用堆外内存来分配本次堆外内存,则抛出OutOfMemoryError("Direct buffer memory”)异常。
    之所以用使用full gc的很重要的一个原因是:System.gc()会对新生代和老生代都进行内存回收,这样会比较彻底地回收DirectByteBuffer对象以及他们关联的堆外内存。
    有两个问题:堆外内存是多大?
    代码中maxMemory = VM.maxDirectMemory();

    private static long directMemory = 64 * 1024 * 1024; //64MB
    public static long maxDirectMemory() {
            return directMemory;
        }
    

    实际上在 JVM启动时,会对System做初始化,实际上堆外内存的大小设置逻辑为:
    如果通过-Dsun.nio.MaxDirectMemorySize指定了这个属性,只要它不等于-1,那效果和加了-XX:MaxDirectMemorySize一样的,如果两个参数都没指定,那么最大堆外内存的值来自于directMemory = Runtime.getRuntime().maxMemory()。
    其中在我们使用CMS GC的情况下的实现如下,其实是新生代的最大值-一个survivor的大小+老生代的最大值,也就是我们设置的-Xmx的值里除去一个survivor的大小就是默认的堆外内存的大小。

    堆外内存的回收机制是什么?
    Cleaner是PhantomReference的子类,并通过自身的next和prev字段维护的一个双向链表PhantomReference的作用在于跟踪垃圾回收过程,并不会对对象的垃圾回收过程造成任何的影响。
    DirectByteBuffer对象在创建的时候关联了一个Cleaner,(cleaner = Cleaner.create(this, new Deallocator(base, size, cap));)说到PhantomReference它其实主要是用来跟踪对象何时被回收的,它不能影响gc决策,但是gc过程中如果发现某个对象除了只有PhantomReference引用它之外,并没有其他的地方引用它了,那将会把这个引用放到java.lang.ref.Reference.pending队列里,在gc完毕的时候通知ReferenceHandler这个守护线程去执行一些后置处理,而DirectByteBuffer关联的cleaner是PhantomReference的一个子类,在最终的处理里会通过Unsafe的free接口来释放DirectByteBuffer对应的堆外内存块.

    private static class ReferenceHandler extends Thread {
    
            ReferenceHandler(ThreadGroup g, String name) {
                super(g, name);
            }
    
            public void run() {
                for (;;) {
    
                    Reference r;
                    synchronized (lock) {
                        if (pending != null) {
                            r = pending;
                            Reference rn = r.next;
                            pending = (rn == r) ? null : rn;
                            r.next = r;
                        } else {
                            try {
                                lock.wait();
                            } catch (InterruptedException x) { }
                            continue;
                        }
                    }
    
                    // Fast path for cleaners
                    if (r instanceof Cleaner) {
                        //
                        ((Cleaner)r).clean();
                        continue;
                    }
    
                    ReferenceQueue q = r.queue;
                    if (q != ReferenceQueue.NULL) q.enqueue(r);
                }
            }
        }
    

    //如果System.gc();被禁止,也会触发堆外内存的回收
    Reference#tryHandlePending(false)

    static boolean tryHandlePending(boolean var0) {
            Reference var1;
            Cleaner var2;
            try {
                Reference.Lock var3 = lock;
                synchronized(lock) {
                    if (pending == null) {
                        if (var0) {
                            lock.wait();
                        }
    
                        return var0;
                    }
    
                    var1 = pending;
                  //cleaner对象
                    var2 = var1 instanceof Cleaner ? (Cleaner)var1 : null;
                    pending = var1.discovered;
                    var1.discovered = null;
                }
            } catch (OutOfMemoryError var6) {
                Thread.yield();
                return true;
            } catch (InterruptedException var7) {
                return true;
            }
    
            if (var2 != null) {
              //clean方法回收
                var2.clean();
                return true;
            } else {
                ReferenceQueue var8 = var1.queue;
                if (var8 != ReferenceQueue.NULL) {
                    var8.enqueue(var1);
                }
    
                return true;
            }
        }
    
    
    public void clean() {
    //将当前Cleaner从Cleaner链表中移除,这样当clean()执行完后,Cleaner就是一个无引用指向的对象了,也就是可被GC回收的对象
            if (remove(this)) {
                try {
                   //thunk 在directByteBuffer 是Deallocator 对象
                    this.thunk.run();
                } catch (final Throwable var2) {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        public Void run() {
                            if (System.err != null) {
                                (new Error("Cleaner terminated abnormally", var2)).printStackTrace();
                            }
    
                            System.exit(1);
                            return null;
                        }
                    });
                }
    
            }
        }
    
            private Deallocator(long address, long size, int capacity) {
                assert (address != 0);
                this.address = address;
                this.size = size;
                this.capacity = capacity;
            }
    
            public void run() {
                if (address == 0) {
                    // Paranoia
                    return;
                }
                //回收掉堆外内存
                unsafe.freeMemory(address);
                address = 0;
               //修改堆外内存的剩余容量大小
                Bits.unreserveMemory(size, capacity);
            }
    

    所以如果一直触发不了cms gc或者full gc,老年代的DirectByteBuffer对象不能被回收,那么堆外内存就一直不能被回收,可能导致内存泄漏。

    参考资料:
    http://ifeve.com/buffers/
    http://lovestblog.cn/blog/2015/05/12/direct-buffer/
    https://www.jianshu.com/p/007052ee3773

    相关文章

      网友评论

          本文标题:Java NIO实现原理之Buffer

          本文链接:https://www.haomeiwen.com/subject/bkictqtx.html