美文网首页
做一个高性能的java流式存储项目你需要知道的一些事儿

做一个高性能的java流式存储项目你需要知道的一些事儿

作者: 江江的大猪 | 来源:发表于2023-11-22 16:28 被阅读0次

    1、目前能够在网上搜到的java相关的高性能文件io文章都比较基础,想深入的话需要既了解java的文件操作api原理,又了解文件操作相关的系统调用,这就造成了学习困难
    2、实际上elasticsearch、kafka、rocketmq里关于文件操作的java实现已经很好,本文也参考了其中不少代码实现
    3、本文的每个结论均提供测试代码验证,方便读者在自己的机器上验证
    4、本文贴出的jdk native源码版本是openjdk的jdk8-b120
    5、本文需要读者对java文件操作、虚拟内存、物理内存、pagecache有一定基础了解
    6、本文的测试代码需要在TEST_PATH目录下提前准备test1-test30文件(根据自己测试环境调整),可以通过fallocate -l 1G test1快速创建30个1g大小的测试文件
    7、测试代码依赖如下,引入netty只是为了使用其中的PlatformDependent工具类,引入JNA是为了执行系统调用

            <dependency>
                <groupId>org.openjdk.jmh</groupId>
                <artifactId>jmh-core</artifactId>
                <version>1.37</version>
            </dependency>
            <dependency>
                <groupId>org.openjdk.jmh</groupId>
                <artifactId>jmh-generator-annprocess</artifactId>
                <version>1.37</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.101.Final</version>
            </dependency>
            <dependency>
                <groupId>net.java.dev.jna</groupId>
                <artifactId>jna</artifactId>
                <version>5.13.0</version>
            </dependency>
    

    基础部分

    FileChannel、MappedByteBuffer的初始化和释放

    • FileChannel的map函数是对系统调用mmap的阉割式封装,仅提供三种mode,READ_ONLY/READ_WRITE/PRIVATE对应mmap的prot、flags的部分组合,下面会贴出jdk源码,mmap系统调用原型:void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
    • 直接使用netty的工具类释放MappedByteBuffer,原理不展开了,读者可以额外查阅资料学习,也可以看我之前的文章https://www.jianshu.com/p/4f026fe063aa
    • 不论是何种方式获取的FileChannel,仅需要关闭FileChannel本身。此处无需关闭RandomAccessFile
    • 需要注意关闭FileChannel是不会释放MappedByteBuffer的,也就是说仅关闭FileChannel后MappedByteBuffer仍然可以继续使用
        FileChannel channel = new RandomAccessFile(TEST_PATH + "test1", "rw").getChannel();
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        PlatformDependent.freeDirectBuffer(mappedByteBuffer);
        fileChannel.close();
    
    • jdk map native源码部分核心如下,可以看到三种mode只对应mmap中prot和flags的三种组合,这也是我为什么说map方法是对mmap系统调用的阉割封装,mmap还有很多实用的flag我们在java中没法直接使用

    完整源码:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileChannelImpl.c

        Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len)
        {
            if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
                protections = PROT_READ;
                flags = MAP_SHARED;
            } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
                protections = PROT_WRITE | PROT_READ;
                flags = MAP_SHARED;
            } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
                protections =  PROT_WRITE | PROT_READ;
                flags = MAP_PRIVATE;
            }
            mapAddress = mmap64(
                    0,                    /* Let OS decide location */
                    len,                  /* Number of bytes to map */
                    protections,          /* File permissions */
                    flags,                /* Changes are shared */
                    fd,                   /* File descriptor of mapped file */
                    off);                 /* Offset into file */
        }
    
    • FileChannel close最终调用FileChannelImpl的implCloseChannel方法,源码如下

    可以看到会对parent执行close,所以对FileChannel执行close就会对RandomAccessFile执行close。其实对RandomAccessFile执行close也会对FileChannel执行close,所以这俩关闭一个就行。一般我们只把RandomAccessFile当做获取FileChannel的工具人,不会保留它的引用,所以最终只会执行FileChannel的close

        protected void implCloseChannel() throws IOException {
            if (this.fileLockTable != null) {
                Iterator var1 = this.fileLockTable.removeAll().iterator();
                while(var1.hasNext()) {
                    FileLock var2 = (FileLock)var1.next();
                    synchronized(var2) {
                        if (var2.isValid()) {
                            this.nd.release(this.fd, var2.position(), var2.size());
                            ((FileLockImpl)var2).invalidate();
                        }
                    }
                }
            }
            this.threads.signalAndWait();
            if (this.parent != null) {
                ((Closeable)this.parent).close();
            } else {
                this.nd.close(this.fd);
            }
        }
    

    FileChannel write和DirectBuffer的关系

    • 源Buffer是DirectBuffer则直接写入,是HeapBuffer则在cache中拿一块DirectBuffer,把数据拷贝进去再写入
    • 这个cache由jdk维护,可以看到里面的DirectBuffer没有主动释放逻辑,只能随着gc释放,因此有oom隐患
    • 所以最佳实践是用户自己构建DirectBuffer写入,并自己控制该DirectBuffer的释放,避免数据拷贝,也避免堆外内存的OOM
    • write方法最后使用sun.nio.ch.IOUtil的write方法,核心源码如下
        static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
            if (var1 instanceof DirectBuffer) {
                return writeFromNativeBuffer(var0, var1, var2, var4);
            } else {
                int var5 = var1.position();
                int var6 = var1.limit();
                int var7 = var5 <= var6 ? var6 - var5 : 0;
                ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
                int var10;
                try {
                    var8.put(var1);
                    var8.flip();
                    var1.position(var5);
                    int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                    if (var9 > 0) {
                        var1.position(var5 + var9);
                    }
                    var10 = var9;
                } finally {
                    Util.offerFirstTemporaryDirectBuffer(var8);
                }
                return var10;
            }
        }
    

    FileChannel read和DirectBuffer的关系

    • 和write同理,直接使用DirectBuffer最好
    • 最终调用sun.nio.ch.IOUtil的read方法,核心源码如下
        static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
            if (var1.isReadOnly()) {
                throw new IllegalArgumentException("Read-only buffer");
            } else if (var1 instanceof DirectBuffer) {
                return readIntoNativeBuffer(var0, var1, var2, var4);
            } else {
                ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
                int var7;
                try {
                    int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                    var5.flip();
                    if (var6 > 0) {
                        var1.put(var5);
                    }
                    var7 = var6;
                } finally {
                    Util.offerFirstTemporaryDirectBuffer(var5);
                }
                return var7;
            }
        }
    

    FileChannel force参数true/false的区别

    • 对文件的写入其实都不会直接写入磁盘(directIO除外),只会写到buffer中,由操作系统统一调度写入磁盘,所以如果需要实时存储就需要主动调用force方法直接写入磁盘
    • jdk文档表示对FileChannel执行force不会保证对该FileChannel通过map方法获得的MappedByteBuffer进行刷盘,如果对MappedByteBuffer有修改需要刷盘,需要调用MappedByteBuffer自己的force方法
    • force方法有参数true/false,jdk文档说的不够具体,只说true的时候会额外写入metadata。实际上true/false对应的系统调用分别是fdatasync/fsync(两个系统调用的详细区别本文不展开),核心源码如下
    • fsync会比fdatasync多一次寻址,将文件大小、修改时间等metadata写入磁盘,性能会比fdatasync差一点,在文件大小固定的情况下可以仅调用fdatasync,文件大小不固定的情况下调用fdatasync没来的及更新文件大小可能会造成丢数据。其实由下面的压测结果可知这两个系统调用性能差距也不是很大(这里可能是固态硬盘和机械硬盘的区别?机械硬盘可能差距更大)

    完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileDispatcherImpl.c

        Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this, jobject fdo, jboolean md)
        {
            if (md == JNI_FALSE) {
                result = fdatasync(fd);
            } else {
                result = fsync(fd);
            }
        }
    

    FileChannel的transferTo方法介绍

    对该方法的解析可以看我之前的文章:https://www.jianshu.com/p/11ed05ca62ff

    • transferTo可以利用sendFile系统调用做到真正的零拷贝传输数据,但是仅限于文件到文件,文件到socket两条路
    • 在文件下载场景可以使用该方法,将文件直接transferTo到socket中,我们自己的程序中是无法感知到文件内容的。最后通过文件hash值判断文件是否正确完整下载
    • 在非文件下载场景基本无法使用transferTo,因为我们的程序逻辑里大多需要从磁盘文件中读取到数据,然后做自己的业务处理,再将数据写入socket
    • 如果硬用transferTo做文件读取(文件到socket),考虑到下面的性能测试,我认为整体也不会比用MappedByteBuffer性能好

    MappedByteBuffer load方法

    • 先执行load0 native方法,然后根据页大小,对每个页读取了一下,最后通过一个累加的x避免编译器认为这段代码是dead code优化掉
    • 可以看到load方法的目标就是主动触发缺页,从而将文件内容真正填充进物理内存中
    • MappedByteBuffer一开始仅仅是虚拟内存,不会分配真正的物理内存,用到的时候才会分配
        public final MappedByteBuffer load() {
            load0(mappingAddress(offset), length);
            // Read a byte from each page to bring it into memory. A checksum
            // is computed as we go along to prevent the compiler from otherwise
            // considering the loop as dead code.
            Unsafe unsafe = Unsafe.getUnsafe();
            int ps = Bits.pageSize();
            int count = Bits.pageCount(length);
            long a = mappingAddress(offset);
            byte x = 0;
            for (int i=0; i<count; i++) {
                x ^= unsafe.getByte(a);
                a += ps;
            }
            if (unused != 0)
                unused = x;
            return this;
        }
    
    • load0核心源码如下,主要就是执行系统调用madvise并且传入参数MADV_WILLNEED,本文对madvise不做展开,读者可以自行查阅

    可以看出load0的作用是告诉操作系统这段内存接下来willneed,操作系统可以对这段内存做些优化处理,比如预读和提前加载之类的,可以加快后续对每个页都触发填充的效率
    完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

        Java_java_nio_MappedByteBuffer_load0(JNIEnv *env, jobject obj, jlong address, jlong len)
        {
            int result = madvise((caddr_t)a, (size_t)len, MADV_WILLNEED);
        }
    

    MappedByteBuffer isLoad方法

    • 主要就是直接调用isLoaded0 native方法
        public final boolean isLoaded() {
            return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
        }
    
    • isLoaded0核心源码如下,通过系统调用mincore检查每个页是否都在物理内存中,mincore不做展开,读者可以自行查阅

    完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

        Java_java_nio_MappedByteBuffer_isLoaded0(JNIEnv *env, jobject obj, jlong address, jlong len, jint numPages)
        {
            jboolean loaded = JNI_TRUE;
            unsigned char *vec = (unsigned char *)malloc(numPages * sizeof(char));
            mincore(address, (size_t)len, vec);
            for (i=0; i<numPages; i++) {
                if (vec[i] == 0) {
                    loaded = JNI_FALSE;
                    break;
                }
            }
            return loaded;
        }
    

    调用了MappedByteBuffer的load方法后,isLoad不一定一直为true

    当pagecache不够用了的时候,操作系统会将cache按照规则释放或者放入swap区,测试代码如下,如果测试电脑的可用内存小于30g,基本上最后会显示false

        public static void main(String[] args) throws IOException, InterruptedException {
            RandomAccessFile r = new RandomAccessFile(TEST_PATH + "/test1", "rw");
            FileChannel fileChannel = r.getChannel();
            MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
            byteBuffer.load();
            new Thread(() -> {
                try {
                    for (int i = 2; i < 31; i++) {
                        RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                        FileChannel channel = f.getChannel();
                        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                        buffer.load();
                    }
                    TimeUnit.DAYS.sleep(1);
                } catch (Exception e) {
                }
            }).start();
            TimeUnit.SECONDS.sleep(20);
            System.out.println(byteBuffer.isLoaded());
        }
    

    FileChannel和MappedByteBuffer读写测试

    • 大部分研究java文件io的应该都看过这篇文章,或者各种转载抄袭的复制品,但是并不知道他是如何测试的,因此对这篇文章的结论不可全信

    http://thinkinjava.cn/2019/05/12/2019/05-12-java-nio/

    • 使用JMH进行基准测试,对JMH不做展开,读者可以自行查阅,推荐资料

    https://developer.aliyun.com/article/899469
    https://dunwu.github.io/java-tutorial/pages/747d3e/#warmup

    • 本测试力求单纯测试读写,尽量避免其他的开销
    • 本测试仅读写文件的第一个页,避免pagecache的不确定性影响,如何获得当前页大小在下面的进阶部分有说明
    • 本测试使用的文件均固定大小1G,提前创建,参照本文最开始的说明(在文章最上面)
    • 测试读取时使用的目标ByteBuffer和数组均通过Threadlocal保存复用,避免每次创建ByteBuffer和数组的开销干扰
    • 测试写入时使用的源ByteBuffer和数组提前创建并共用,避免每次写入都创建的干扰开销
    • 测试包括如下几项

    从FileChannel读到HeapByteBuffer
    从FileChannel读到DirectByteBuffer
    从HeapByteBuffer写入FileChannel
    从DirectByteBuffer写入FileChannel
    从DirectByteBuffer写入FileChannel并force(false)
    从DirectByteBuffer写入FileChannel并force(true)
    从MappedByteBuffer读到数组
    从数组写入MappedByteBuffer
    从HeapByteBuffer写入MappedByteBuffer
    从DirectByteBuffer写入MappedByteBuffer
    从DirectByteBuffer写入MappedByteBuffer并force

    • Linux version 3.10.0-327.ali2017.alios7.x86_64 16c32g的机器测试结果如下,可以说在文件大小固定的前提下MappedByteBuffer全面完胜FileChannel

    FileChannel读到DirectBuffer比读到HeapBuffer性能好,如上文所述,符合预期
    FileChannel写入DirectBuffer比HeapBuffer,如上文所述,符合预期
    FileChannel force(false)比force(true)好一些
    MappedByteBuffer读到数组中有压倒性读取优势,比FileChannel高两个数量级
    MappedByteBuffer写入数组数据性能好一些,写入DirectBuffer和HeapBuffer差不多
    MappedByteBuffer比FileChannel的写入性能、force性能都好
    mac上测试结果差不多,但是MappedByteBuffer从数组、HeapBuffer、DirectBuffer写入差距不像linux上明显

    Benchmark                                               Mode  Cnt      Score   Error   Units
    MyBenchmark.fileChannelRead2DirectBuffer               thrpt         968.349          ops/ms
    MyBenchmark.fileChannelRead2HeapBuffer                 thrpt         669.176          ops/ms
    MyBenchmark.fileChannelWriteFromDirectBuffer           thrpt         286.462          ops/ms
    MyBenchmark.fileChannelWriteFromDirectBufferForce      thrpt           3.992          ops/ms
    MyBenchmark.fileChannelWriteFromDirectBufferForceMeta  thrpt           3.169          ops/ms
    MyBenchmark.fileChannelWriteFromHeapBuffer             thrpt         258.994          ops/ms
    MyBenchmark.mappedRead2Array                           thrpt       67872.596          ops/ms
    MyBenchmark.mappedWriteFromArray                       thrpt        1675.428          ops/ms
    MyBenchmark.mappedWriteFromDirectBuffer                thrpt        1585.909          ops/ms
    MyBenchmark.mappedWriteFromDirectBufferForce           thrpt           6.651          ops/ms
    MyBenchmark.mappedWriteFromHeapBuffer                  thrpt        1559.150          ops/ms
    
    @BenchmarkMode(Mode.Throughput)
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    @Fork(1)
    @Warmup(iterations = 3, time = 10)
    @Measurement(iterations = 1, time = 10)
    @Threads(16)
    @State(Scope.Benchmark)
    public class MyBenchmark {
        private static final int SIZE = pageSize;
        private FileChannel fileChannel;
        private MappedByteBuffer mappedByteBuffer;
        private final ThreadLocal<ByteBuffer> fileChannelRead2HeapBufferThreadLocal = new ThreadLocal<>();
        private final ThreadLocal<ByteBuffer> fileChannelRead2DirectBufferThreadLocal = new ThreadLocal<>();
        private final ThreadLocal<byte[]> mappedRead2ArrayThreadLocal = new ThreadLocal<>();
        private final byte[] srcByteArray = new byte[SIZE];
        private final ByteBuffer srcHeapBuffer = ByteBuffer.allocate(SIZE);
        private final ByteBuffer srcDirectBuffer = ByteBuffer.allocateDirect(SIZE);
    
        @Setup
        public void setup() throws IOException {
            for (int i = 0; i < SIZE; i++) {
                srcByteArray[i] = 9;
            }
            srcHeapBuffer.put(srcByteArray, 0, SIZE);
            srcHeapBuffer.flip();
            srcDirectBuffer.put(srcByteArray, 0, SIZE);
            srcDirectBuffer.flip();
            RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
            fileChannel = r.getChannel();
            mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        }
    
        @TearDown
        public void tearDown() throws IOException {
            PlatformDependent.freeDirectBuffer(mappedByteBuffer);
            fileChannel.close();
        }
    
        @Benchmark
        public void fileChannelRead2HeapBuffer(Blackhole blackhole) throws IOException {
            ByteBuffer byteBuffer = fileChannelRead2HeapBufferThreadLocal.get();
            if (byteBuffer == null) {
                byteBuffer = ByteBuffer.allocate(SIZE);
                fileChannelRead2HeapBufferThreadLocal.set(byteBuffer);
            }
            blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
        }
    
        @Benchmark
        public void fileChannelRead2DirectBuffer(Blackhole blackhole) throws IOException {
            ByteBuffer byteBuffer = fileChannelRead2DirectBufferThreadLocal.get();
            if (byteBuffer == null) {
                byteBuffer = ByteBuffer.allocateDirect(SIZE);
                fileChannelRead2DirectBufferThreadLocal.set(byteBuffer);
            }
            blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
        }
    
        @Benchmark
        public void fileChannelWriteFromHeapBuffer(Blackhole blackhole) throws IOException {
            blackhole.consume(fileChannel.write(srcHeapBuffer.slice(), 0L));
        }
    
        @Benchmark
        public void fileChannelWriteFromDirectBuffer(Blackhole blackhole) throws IOException {
            blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        }
    
        @Benchmark
        public void fileChannelWriteFromDirectBufferForce(Blackhole blackhole) throws IOException {
            blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
            fileChannel.force(false);
        }
    
        @Benchmark
        public void fileChannelWriteFromDirectBufferForceMeta(Blackhole blackhole) throws IOException {
            blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
            fileChannel.force(true);
        }
    
        @Benchmark
        public void mappedRead2Array(Blackhole blackhole) {
            byte[] array = mappedRead2ArrayThreadLocal.get();
            if (array == null) {
                array = new byte[SIZE];
                mappedRead2ArrayThreadLocal.set(array);
            }
            blackhole.consume(mappedByteBuffer.slice().get(array, 0, SIZE));
        }
    
        @Benchmark
        public void mappedWriteFromArray(Blackhole blackhole) {
            blackhole.consume(mappedByteBuffer.slice().put(srcByteArray, 0, SIZE));
        }
    
        @Benchmark
        public void mappedWriteFromHeapBuffer(Blackhole blackhole) {
            blackhole.consume(mappedByteBuffer.slice().put(srcHeapBuffer.slice()));
        }
    
        @Benchmark
        public void mappedWriteFromDirectBuffer(Blackhole blackhole) {
            blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
        }
    
        @Benchmark
        public void mappedWriteFromDirectBufferForce(Blackhole blackhole) {
            blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
            mappedByteBuffer.force();
        }
    }
    

    进阶部分

    java工程中通过JNA调用libc标准库函数

    对JNA不做展开介绍,读者可以自行查阅学习:https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

    import com.sun.jna.Library;
    import com.sun.jna.Native;
    import com.sun.jna.NativeLong;
    import com.sun.jna.Platform;
    import com.sun.jna.Pointer;
    
    public interface LibC extends Library {
        LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
        // 是不是很熟悉,大家学的第一个函数应该就是这个吧:)
        void printf(String format, Object... args);
        // 本文的核心,后面详细介绍
        int mlock(Pointer var1, NativeLong var2);
    }
    

    通过mlock将锁定物理内存,避免内存被swap,保持物理内存常驻

    mlock不做详细展开,读者可以自行查阅,推荐文章如下
    http://www.daileinote.com/computer/linux_sys/32
    https://www.cnblogs.com/linhaostudy/p/15972330.html

    • 其实mmap方法就可以直接设置flag为MAP_LOCKED将内存锁住,但是jdk没有提供该能力,所以我们需要才需要直接调用mlock锁住内存
    • 用户可以锁住的内存大小受操作系统限制,可以ulimit -l查看,mac和linux一般应该都是无限的unlimited
    • sysctl_max_map_count 规定了进程虚拟内存空间所能包含VmArea的最大个数,可以通过 /proc/sys/vm/max_map_count 内核参数来调整 sysctl_max_map_count,一次mmap对应产生一个VmArea
    • 对mmap的内存执行mlock后就已经触发了缺页将所有物理内存填充好了,也就不需要madvise,也不需要再调用MappedByteBuffer的load方法了,测试代码如下,最后byteBuffer.isLoaded()会显示true
        public static void main(String[] args) throws IOException, InterruptedException {
            RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
            FileChannel fileChannel = r.getChannel();
            MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
            long address = PlatformDependent.directBufferAddress(byteBuffer);
            Pointer p = new Pointer(address);
            System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
            System.out.println(byteBuffer.isLoaded());
    }
    
    • mlock之后无需调用munlock,会随着MappedByteBuffer的释放自动munlock(也符合文档说明,munmap自动munlock),测试代码如下

    该测试程序可以永远执行下去,mlock的返回值也一直会是0(系统调用返回值0代表正常)
    读者可以试试只调用mlock,但是不释放MappedByteBuffer的情况,小心电脑死机 : )

        public static void main(String[] args) throws IOException, InterruptedException {
            try {
                while (true) {
                    for (int i = 1; i < 31; i++) {
                        RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                        FileChannel channel = f.getChannel();
                        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                        long add = PlatformDependent.directBufferAddress(buffer);
                        Pointer p = new Pointer(add);
                        System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
                        channel.close();
                        PlatformDependent.freeDirectBuffer(buffer);
                    }
                }
            } catch (Exception e) {
            }
    
    • windows环境没有mlock,需要调用kernel32.dll中的VirtualLock系统调用,按照JNA文档需要实现JNA的StdCallLibrary接口

    https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

    • windows环境下要更复杂一点,VirtualLock受大小和个数影响。在elasticsearch中有使用到,可以参考源码学习,本文不做展开(感觉在windows上搞这个优化没啥意义)

    https://github.com/elastic/elasticsearch/blob/951640b73f71909013f57645cd30e1f19d8c2323/server/src/main/java/org/elasticsearch/bootstrap/JNAKernel32Library.java#L30

    获取程序运行机器的操作系统页大小

    • 参考netty中PlatformDependent0中执行Bits类中unaligned方法的过程
        private int pageSize = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
            try {
                Class<?> bitsClass = Class.forName("java.nio.Bits", false, PlatformDependent.getSystemClassLoader());
                Method pageSizeMethod = bitsClass.getDeclaredMethod("pageSize");
                pageSizeMethod.setAccessible(true);
                return (Integer) pageSizeMethod.invoke(null);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    

    总结

    • 在java领域实现高性能流式存储现在看就比较清晰了,固定文件大小,通过MappedByteBuffer配合系统调用mlock,将最新的文件mlock到物理内存中,保证新数据的实时读取
    • 根据调用请求分析将部分热点老文件也mlock住,并做动态调整,避免大量冷读造成读取性能下降
    • RocketMQ中文件预热部分有mlock相关使用,但是为了兼容使用预热和不使用预热两种情况,在预热部分有些冗余调用,可以理解。预热过程既然调用了mlock就无需对每个页写一个字节填充物理页了,也无需调用madvise

    相关文章

      网友评论

          本文标题:做一个高性能的java流式存储项目你需要知道的一些事儿

          本文链接:https://www.haomeiwen.com/subject/ryqwwdtx.html