美文网首页
DirectByteBuffer解析和文件IO详解

DirectByteBuffer解析和文件IO详解

作者: 一天的 | 来源:发表于2019-09-28 12:48 被阅读0次

    java.nio 包里,是java用于处理IO的新的API,它使用channel、select等模型,重新对IO操作进行了新的实现。

    DirectByteBuffer就是nio包下面的一个类。这个类用于保存byte数组,其特别之处在于:他将数据保存在堆外内存。不像传统的对象,对象都在堆中。这样的好处就是对于 IO操作,减少了内存copy次数,从而增加效率。这里以文件IO进行讲解

    在这里我们先把结论说一下:

    a. 传统的IO操作(就是使用java.io包的api)访问磁盘文件,数据需要copy的次数:

    1. 磁盘文件的数据 copy 内核page cache 
    
    2. 内核的数据 copy  应用程序空间(即:jvm 堆外内存)
    
    3. jvm堆外内存  copy  jvm堆内 内存
    

    为什么2、和3 不合并,将内核数据 copy jvm堆内内存。 因为jvm进行系统调用进行读文件时候,此时发生gc,那么堆内存的对应地址就会移动,所以直接copy到堆内是有问题的。

    b. 使用DirectByteBuffer访问磁盘文件,数据需要copy的次数:

       1. 磁盘文件的数据 copy 内核page cache 
    
       2. 内核的数据 copy  应用程序空间(即:DirectByteBuffer)
    

    所以DirectByteBuffer减少了内存copy次数。

    1.传统文件IO解析

    文件读取示例:

     FileInputStream input = new FileInputStream("/data");
     byte[] b = new byte[SIZE]; 
     input.read(b);
    

    byte数组示堆内存对象,此处将数据copy 到jvm堆内存。我们看一下read函数内部实现

     public int read(byte b[]) throws IOException {  
         return readBytes(b, 0, b.length);
     }
     private native int readBytes(byte b[], int off, int len) throws IOException;
    

    我们看到 read函数最终调用 native函数 readBytes。

    jint readBytes(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len, jfieldID fid) {
        jint nread;
        char stackBuf[ BUF_SIZE];
        char *buf = NULL;
        FD fd;
        if (IS_NULL(bytes)) {
            JNU_ThrowNullPointerException(env, NULL);
            return -1;
        }
        if (outOfBounds(env, off, len, bytes)) {
            JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
            return -1;
        }
        if (len == 0) {
            return 0;
        } else if (len > BUF_SIZE) {
            buf = malloc(len);
            if (buf == NULL) {
                JNU_ThrowOutOfMemoryError(env, NULL);
                return 0;
            }
        } else {
            buf = stackBuf;
        }
        fd = GET_FD(this, fid);
        if (fd == -1) {
            JNU_ThrowIOException(env, "Stream Closed");
            nread = -1;
        } else {
            nread = IO_Read(fd, buf, len);
            if (nread > 0) {
                ( * env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
            } else if (nread == -1) {
                JNU_ThrowIOExceptionWithLastError(env, "Read error");
            } else { /* EOF */
                nread = -1;
            }
        }
        if (buf != stackBuf) {
            free(buf);
        }
        return nread;
    }
    

    我们看到最终通过IO_Read将缓冲数据读到buf中去,这个IO_Read其实是一个宏定义:

    define IO_Read handleRead

    handleRead函数实现如下,这里你可以看到这里进行了read系统调用:

     ssize_t handleRead(FD fd, void *buf, jint len)  { 
         ssize_t result; 
         RESTARTABLE(read(fd, buf, len), result); 
         return result; 
     }
    

    buf返回之后,由SetByteArrayRegion这个JNI函数拷贝到了bytes,它的具体实现如下(下面定义了一个通用的宏函数来表示各种数据类型数组区域的设置,可以将Result宏替换成Byte即可理解):

     JNI_ENTRY(void,  jni_Set##Result##ArrayRegion(JNIEnv *env, ElementType##Array array, jsize start,  jsize len, const ElementType *buf))  
       JNIWrapper("Set" XSTR(Result) "ArrayRegion");  
       DTRACE_PROBE5(hotspot_jni, Set##Result##ArrayRegion__entry, env, array, start, len, buf); 
       DT_VOID_RETURN_MARK(Set##Result##ArrayRegion);  
       typeArrayOop dst = typeArrayOop(JNIHandles::resolve_non_null(array));  
       if (start < 0 || len < 0 || ((unsigned int)start + (unsigned int)len > (unsigned int)dst->length())) {  
         THROW(vmSymbols::java_lang_ArrayIndexOutOfBoundsException());  
       } else { 
         if (len > 0) {  
           int sc = TypeArrayKlass::cast(dst->klass())->log2_element_size();  
           memcpy((u_char*) dst->Tag##_at_addr(start), 
                  (u_char*) buf,  
                  len << sc);    
         } 
      }  
     JNI_END
    

    (以上内容部门来源:https://www.zhihu.com/question/65415926

    由此可见,native方法,readBytes而采用了C Heap - JVM Heap进行内存拷贝的方式进行数据传递。

    而readBytes 通过调用 handleRead 进行读写。handleRead就是读取内核缓存区数据。内核数据来源文件。

    2. DirectByteBuffer

    DirectByteBuffer 是构建在堆外的内存的对象。

    DirectByteBuffer是包级别可访问的,通过 ByteBuffer.allocateDirect(int capacity) 进行构造。

    
     public static ByteBuffer allocateDirect(int capacity) { 
          return new DirectByteBuffer(capacity);
     }
    

    我们看一下DirectByteBuffer 构造函数实现

    
     DirectByteBuffer(int cap) {// package-private 
         super(-1,0, cap, cap); 
         boolean pa = VM.isDirectMemoryPageAligned(); 
         int ps = Bits.pageSize(); 
         long size = Math.max(1L, (long)cap + (pa ? ps :0)); 
         Bits.reserveMemory(size, cap); 
         long base =0; 
         try { 
             base =unsafe.allocateMemory(size); 
         }catch (OutOfMemoryError x) { 
             Bits.unreserveMemory(size, cap); 
             throw x;
         }
     
         unsafe.setMemory(base, size, (byte)0); 
         if (pa && (base % ps !=0)) { 
             // Round up to page boundary
             address = base + ps - (base & (ps -1)); 
         }else { 
             address = base; 
         }
     
         cleaner = Cleaner.create(this,new Deallocator(base, size, cap));    
         att =null;
     }
    

    这里我们主要关注这几个地方:

    1.unsafe.allocateMemory(size);

    利用 unsafe 类在堆外内存(C_HEAP)中分配了一块空间,这是一个 native 函数,转到进行堆外内存分配的 C/C++ 代码

     inline char* AllocateHeap( size_t size, MEMFLAGS flags, address pc = 0, AllocFailType alloc_failmode = AllocFailStrategy::EXIT_OOM){ 
        // ... 省略 
       char*p=(char*)os::malloc(size, flags, pc); 
       // 分配在 C_HEAP 上并返回指向内存区域的指针 
       // ... 省略 
       return p; 
     }
    

    2.cleaner = Cleaner.create(this,new Deallocator(base, size, cap));

    cleaner对象是对DirectByteBuffer占用对堆外内存进行清理。DirectByteBuffer.cleaner().clean() 进行手动清理。我们看一下clean() 函数

    
     public void clean() { 
         //....省略 
         this.thunk.run(); 
         //....省略 
     }
    

    其中 thunk就是我们 Cleaner.create(this,new Deallocator(base, size, cap)); 中的Deallocator。看一下Deallocator。

     private static class Deallocator implements Runnable  { 
     //。。。省略 
         public void run() { 
             if (address ==0) { 
                 // Paranoia 
                 return; 
                } 
             unsafe.freeMemory(address); 
             address =0; 
             Bits.unreserveMemory(size,capacity); 
         } 
     }
    

    可以看到其是一个线程进行 堆外内存的释放动作。

    cleaner是PhantomReference的子类。

    PhantomReference它其实主要是用来跟踪对象何时被回收的,它不能影响gc决策,但是gc过程中如果发现某个对象除了只有PhantomReference引用它之外,并没有其他的地方引用它了,那将会把这个引用放到java.lang.ref.Reference.pending队列里,在gc完毕的时候通知ReferenceHandler这个守护线程去执行一些后置处理。这个处理方法中,就会判断是否是cleaner对象,如果是,就执行clean()函数。

    因此DirectByteBuffer并不需要我们手动清理内存。当jvm进行gc(oldgc)的时候,就会清理没有引用的 dirctByteBuffer。

    当我们一直申请DirectByteBuffer。其实占用的是堆外内存,堆内内存只是占用一个引用。如果一直触发不了gc,那么堆外内存就不会回收,导致jvm进程占用内存很大。我们可以通过-XX:MaxDirectMemorySize限制DirecByteBuffer占用堆外内存的大小

    3.Bits.reserveMemory(size, cap);

     static void reserveMemory(long size,int cap) { 
         synchronized (Bits.class) { 
             if (!memoryLimitSet && VM.isBooted()) { 
                 maxMemory = VM.maxDirectMemory(); 
                 memoryLimitSet =true; 
             } 
             // -XX:MaxDirectMemorySize limits the total capacity rather than the 
             // actual memory usage, which will differ when buffers are page 
             // aligned. 
             if (cap <=maxMemory -totalCapacity) { 
                 reservedMemory += size; 
                 totalCapacity += cap; 
                 count++; 
                 return; 
             } 
         } 
         System.gc(); 
         try { 
             Thread.sleep(100); 
         }catch (InterruptedException x) { 
             // Restore interrupt status 
             Thread.currentThread().interrupt(); 
         } 
         synchronized (Bits.class) { 
             if (totalCapacity + cap >maxMemory) 
                 throw new OutOfMemoryError("Direct buffer memory"); 
             reservedMemory += size; 
             totalCapacity += cap; 
             count++; 
             } 
     }
    

    该函数用于统计DirectByteBuffer占用的大小。VM.maxDirectMemory()是jvm允许申请的最大DirectBuffer的大小(XX:MaxDirectMemorySize 通过这个参数设置)

    如果发现当前申请的空间,大于限制的空间,就会触发一次gc,上面说过gc会回收哪些之前不使用的directBuffer。然后再次申请。

    VM.maxDirectMemory() 大小是如何设置的内,在VM类有这样一段代码

     public static void saveAndRemoveProperties(Properties var0) { 
         //....
         String var1 = (String)var0.remove("sun.nio.MaxDirectMemorySize"); 
         if (var1 !=null) { 
             if (var1.equals("-1")) { 
                 directMemory = Runtime.getRuntime().maxMemory(); 
             }else {
                 long var2 = Long.parseLong(var1); 
                 if (var2 > -1L) { 
                 directMemory = var2; 
                 } 
         } 
         //...
     }
    

    "sun.nio.MaxDirectMemorySize" 这个属性就是通过 -XX:MaxDirectMemorySize 这个参数设置的。如果我们不指定这个jvm参数,笔者在jdk8中测试了一下,默认是-1,这样就导致directBufffer内存限制为进程最大内存。当然这也是一个潜在风险。

    风险案例:

    笔者曾在线上运行一个应用。该应用就是从消息队列中消费数据,然后将数据处理后存到Hbase中。但是应用运行每次运行2周左右,机器就会出现swap占用过大。经过分析,是jvm进程占用内存太大,但是分析jvm相关参数(堆、线程大小),并没有设置的很大。最后发现原来是directBuffer占用达到了10G。后面通过-XX:MaxDirectMemorySize=2048m 限制directbuffer使用量,解决了问题。每次directBuffer占用达到2G,就会触发一次fullgc,将之前的无用directbuffer回收掉。hbase一个坑,有时间笔者会整理这个案例。

    3.DirectByteBuffer文件IO

    文件读取示例:

    FileChannel filechannel=new RandomAccessFile("/data/appdatas/cat/mmm","rw").getChannel(); 
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SIZE); 
    filechannel.read(byteBuffer)
    

    我们看一下read函数

     public int read(ByteBuffer var1)throws IOException { 
        //。。。。 
        var3 = IOUtil.read(this.fd, var1, -1L,this.nd); 
        //。。。。 
     }
    

    主要逻辑调用IOUtil.read。我们看一下这个函数

     static int read(FileDescriptor var0, ByteBuffer var1,long var2, NativeDispatcher var4)throws IOException { 
         if (var1.isReadOnly()) { 
             throw new IllegalArgumentException("Read-only buffer");
         }else if (var1instanceof DirectBuffer) { 
             return readIntoNativeBuffer(var0, var1, var2, var4); 
         }else { 
             ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); 
             int var7; 
         try { 
             int var6 = readIntoNativeBuffer(var0, var5, var2, var4); 
             var5.flip(); 
             if (var6 >0) { 
                 var1.put(var5); 
             } 
             var7 = var6; 
         }finally { 
             Util.offerFirstTemporaryDirectBuffer(var5); 
         } 
         return var7; 
         } 
     }
    

    主要方法就是通过 readIntoNativeBuffer 这个函数将数据读入 directBuffer中,其中readIntoNativeBuffer也是调用一个native方法。

    通过上面的代码,我们会看到,如果fielchannel.read(ByteBuffer) 也可以传入一个HeapByteBuffer,这个类是堆中。如果是这个类,那么内部读取的时候,会把数据先读到DirectByteBuffer中,然后在copy到HeapByteBuffer中。Util.getTemporaryDirectBuffer(var1.remaining());就是获取一个DirectBuffer对像。因为DirectBuffer创建的时候,开销比较大,所以使用的时候一般会用一个池子来管理。有兴趣可以看一下Util这个类里面的实现。

    相关文章

      网友评论

          本文标题:DirectByteBuffer解析和文件IO详解

          本文链接:https://www.haomeiwen.com/subject/zlfzuctx.html