美文网首页
做一个高性能的java流式存储项目你需要知道的一些事儿

做一个高性能的java流式存储项目你需要知道的一些事儿

作者: 江江的大猪 | 来源:发表于2023-11-22 16:28 被阅读0次

1、目前能够在网上搜到的java相关的高性能文件io文章都比较基础,想深入的话需要既了解java的文件操作api原理,又了解文件操作相关的系统调用,这就造成了学习困难
2、实际上elasticsearch、kafka、rocketmq里关于文件操作的java实现已经很好,本文也参考了其中不少代码实现
3、本文的每个结论均提供测试代码验证,方便读者在自己的机器上验证
4、本文贴出的jdk native源码版本是openjdk的jdk8-b120
5、本文需要读者对java文件操作、虚拟内存、物理内存、pagecache有一定基础了解
6、本文的测试代码需要在TEST_PATH目录下提前准备test1-test30文件(根据自己测试环境调整),可以通过fallocate -l 1G test1快速创建30个1g大小的测试文件
7、测试代码依赖如下,引入netty只是为了使用其中的PlatformDependent工具类,引入JNA是为了执行系统调用

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>1.37</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>1.37</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.101.Final</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>5.13.0</version>
        </dependency>

基础部分

FileChannel、MappedByteBuffer的初始化和释放

  • FileChannel的map函数是对系统调用mmap的阉割式封装,仅提供三种mode,READ_ONLY/READ_WRITE/PRIVATE对应mmap的prot、flags的部分组合,下面会贴出jdk源码,mmap系统调用原型:void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
  • 直接使用netty的工具类释放MappedByteBuffer,原理不展开了,读者可以额外查阅资料学习,也可以看我之前的文章https://www.jianshu.com/p/4f026fe063aa
  • 不论是何种方式获取的FileChannel,仅需要关闭FileChannel本身。此处无需关闭RandomAccessFile
  • 需要注意关闭FileChannel是不会释放MappedByteBuffer的,也就是说仅关闭FileChannel后MappedByteBuffer仍然可以继续使用
    FileChannel channel = new RandomAccessFile(TEST_PATH + "test1", "rw").getChannel();
    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    PlatformDependent.freeDirectBuffer(mappedByteBuffer);
    fileChannel.close();
  • jdk map native源码部分核心如下,可以看到三种mode只对应mmap中prot和flags的三种组合,这也是我为什么说map方法是对mmap系统调用的阉割封装,mmap还有很多实用的flag我们在java中没法直接使用

完整源码:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileChannelImpl.c

    Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len)
    {
        if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
            protections = PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
            protections = PROT_WRITE | PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
            protections =  PROT_WRITE | PROT_READ;
            flags = MAP_PRIVATE;
        }
        mapAddress = mmap64(
                0,                    /* Let OS decide location */
                len,                  /* Number of bytes to map */
                protections,          /* File permissions */
                flags,                /* Changes are shared */
                fd,                   /* File descriptor of mapped file */
                off);                 /* Offset into file */
    }
  • FileChannel close最终调用FileChannelImpl的implCloseChannel方法,源码如下

可以看到会对parent执行close,所以对FileChannel执行close就会对RandomAccessFile执行close。其实对RandomAccessFile执行close也会对FileChannel执行close,所以这俩关闭一个就行。一般我们只把RandomAccessFile当做获取FileChannel的工具人,不会保留它的引用,所以最终只会执行FileChannel的close

    protected void implCloseChannel() throws IOException {
        if (this.fileLockTable != null) {
            Iterator var1 = this.fileLockTable.removeAll().iterator();
            while(var1.hasNext()) {
                FileLock var2 = (FileLock)var1.next();
                synchronized(var2) {
                    if (var2.isValid()) {
                        this.nd.release(this.fd, var2.position(), var2.size());
                        ((FileLockImpl)var2).invalidate();
                    }
                }
            }
        }
        this.threads.signalAndWait();
        if (this.parent != null) {
            ((Closeable)this.parent).close();
        } else {
            this.nd.close(this.fd);
        }
    }

FileChannel write和DirectBuffer的关系

  • 源Buffer是DirectBuffer则直接写入,是HeapBuffer则在cache中拿一块DirectBuffer,把数据拷贝进去再写入
  • 这个cache由jdk维护,可以看到里面的DirectBuffer没有主动释放逻辑,只能随着gc释放,因此有oom隐患
  • 所以最佳实践是用户自己构建DirectBuffer写入,并自己控制该DirectBuffer的释放,避免数据拷贝,也避免堆外内存的OOM
  • write方法最后使用sun.nio.ch.IOUtil的write方法,核心源码如下
    static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            int var5 = var1.position();
            int var6 = var1.limit();
            int var7 = var5 <= var6 ? var6 - var5 : 0;
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
            int var10;
            try {
                var8.put(var1);
                var8.flip();
                var1.position(var5);
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }
                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }
            return var10;
        }
    }

FileChannel read和DirectBuffer的关系

  • 和write同理,直接使用DirectBuffer最好
  • 最终调用sun.nio.ch.IOUtil的read方法,核心源码如下
    static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
            int var7;
            try {
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();
                if (var6 > 0) {
                    var1.put(var5);
                }
                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }
            return var7;
        }
    }

FileChannel force参数true/false的区别

  • 对文件的写入其实都不会直接写入磁盘(directIO除外),只会写到buffer中,由操作系统统一调度写入磁盘,所以如果需要实时存储就需要主动调用force方法直接写入磁盘
  • jdk文档表示对FileChannel执行force不会保证对该FileChannel通过map方法获得的MappedByteBuffer进行刷盘,如果对MappedByteBuffer有修改需要刷盘,需要调用MappedByteBuffer自己的force方法
  • force方法有参数true/false,jdk文档说的不够具体,只说true的时候会额外写入metadata。实际上true/false对应的系统调用分别是fdatasync/fsync(两个系统调用的详细区别本文不展开),核心源码如下
  • fsync会比fdatasync多一次寻址,将文件大小、修改时间等metadata写入磁盘,性能会比fdatasync差一点,在文件大小固定的情况下可以仅调用fdatasync,文件大小不固定的情况下调用fdatasync没来的及更新文件大小可能会造成丢数据。其实由下面的压测结果可知这两个系统调用性能差距也不是很大(这里可能是固态硬盘和机械硬盘的区别?机械硬盘可能差距更大)

完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileDispatcherImpl.c

    Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this, jobject fdo, jboolean md)
    {
        if (md == JNI_FALSE) {
            result = fdatasync(fd);
        } else {
            result = fsync(fd);
        }
    }

FileChannel的transferTo方法介绍

对该方法的解析可以看我之前的文章:https://www.jianshu.com/p/11ed05ca62ff

  • transferTo可以利用sendFile系统调用做到真正的零拷贝传输数据,但是仅限于文件到文件,文件到socket两条路
  • 在文件下载场景可以使用该方法,将文件直接transferTo到socket中,我们自己的程序中是无法感知到文件内容的。最后通过文件hash值判断文件是否正确完整下载
  • 在非文件下载场景基本无法使用transferTo,因为我们的程序逻辑里大多需要从磁盘文件中读取到数据,然后做自己的业务处理,再将数据写入socket
  • 如果硬用transferTo做文件读取(文件到socket),考虑到下面的性能测试,我认为整体也不会比用MappedByteBuffer性能好

MappedByteBuffer load方法

  • 先执行load0 native方法,然后根据页大小,对每个页读取了一下,最后通过一个累加的x避免编译器认为这段代码是dead code优化掉
  • 可以看到load方法的目标就是主动触发缺页,从而将文件内容真正填充进物理内存中
  • MappedByteBuffer一开始仅仅是虚拟内存,不会分配真正的物理内存,用到的时候才会分配
    public final MappedByteBuffer load() {
        load0(mappingAddress(offset), length);
        // Read a byte from each page to bring it into memory. A checksum
        // is computed as we go along to prevent the compiler from otherwise
        // considering the loop as dead code.
        Unsafe unsafe = Unsafe.getUnsafe();
        int ps = Bits.pageSize();
        int count = Bits.pageCount(length);
        long a = mappingAddress(offset);
        byte x = 0;
        for (int i=0; i<count; i++) {
            x ^= unsafe.getByte(a);
            a += ps;
        }
        if (unused != 0)
            unused = x;
        return this;
    }
  • load0核心源码如下,主要就是执行系统调用madvise并且传入参数MADV_WILLNEED,本文对madvise不做展开,读者可以自行查阅

可以看出load0的作用是告诉操作系统这段内存接下来willneed,操作系统可以对这段内存做些优化处理,比如预读和提前加载之类的,可以加快后续对每个页都触发填充的效率
完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_load0(JNIEnv *env, jobject obj, jlong address, jlong len)
    {
        int result = madvise((caddr_t)a, (size_t)len, MADV_WILLNEED);
    }

MappedByteBuffer isLoad方法

  • 主要就是直接调用isLoaded0 native方法
    public final boolean isLoaded() {
        return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
    }
  • isLoaded0核心源码如下,通过系统调用mincore检查每个页是否都在物理内存中,mincore不做展开,读者可以自行查阅

完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_isLoaded0(JNIEnv *env, jobject obj, jlong address, jlong len, jint numPages)
    {
        jboolean loaded = JNI_TRUE;
        unsigned char *vec = (unsigned char *)malloc(numPages * sizeof(char));
        mincore(address, (size_t)len, vec);
        for (i=0; i<numPages; i++) {
            if (vec[i] == 0) {
                loaded = JNI_FALSE;
                break;
            }
        }
        return loaded;
    }

调用了MappedByteBuffer的load方法后,isLoad不一定一直为true

当pagecache不够用了的时候,操作系统会将cache按照规则释放或者放入swap区,测试代码如下,如果测试电脑的可用内存小于30g,基本上最后会显示false

    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "/test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        byteBuffer.load();
        new Thread(() -> {
            try {
                for (int i = 2; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    buffer.load();
                }
                TimeUnit.DAYS.sleep(1);
            } catch (Exception e) {
            }
        }).start();
        TimeUnit.SECONDS.sleep(20);
        System.out.println(byteBuffer.isLoaded());
    }

FileChannel和MappedByteBuffer读写测试

  • 大部分研究java文件io的应该都看过这篇文章,或者各种转载抄袭的复制品,但是并不知道他是如何测试的,因此对这篇文章的结论不可全信

http://thinkinjava.cn/2019/05/12/2019/05-12-java-nio/

  • 使用JMH进行基准测试,对JMH不做展开,读者可以自行查阅,推荐资料

https://developer.aliyun.com/article/899469
https://dunwu.github.io/java-tutorial/pages/747d3e/#warmup

  • 本测试力求单纯测试读写,尽量避免其他的开销
  • 本测试仅读写文件的第一个页,避免pagecache的不确定性影响,如何获得当前页大小在下面的进阶部分有说明
  • 本测试使用的文件均固定大小1G,提前创建,参照本文最开始的说明(在文章最上面)
  • 测试读取时使用的目标ByteBuffer和数组均通过Threadlocal保存复用,避免每次创建ByteBuffer和数组的开销干扰
  • 测试写入时使用的源ByteBuffer和数组提前创建并共用,避免每次写入都创建的干扰开销
  • 测试包括如下几项

从FileChannel读到HeapByteBuffer
从FileChannel读到DirectByteBuffer
从HeapByteBuffer写入FileChannel
从DirectByteBuffer写入FileChannel
从DirectByteBuffer写入FileChannel并force(false)
从DirectByteBuffer写入FileChannel并force(true)
从MappedByteBuffer读到数组
从数组写入MappedByteBuffer
从HeapByteBuffer写入MappedByteBuffer
从DirectByteBuffer写入MappedByteBuffer
从DirectByteBuffer写入MappedByteBuffer并force

  • Linux version 3.10.0-327.ali2017.alios7.x86_64 16c32g的机器测试结果如下,可以说在文件大小固定的前提下MappedByteBuffer全面完胜FileChannel

FileChannel读到DirectBuffer比读到HeapBuffer性能好,如上文所述,符合预期
FileChannel写入DirectBuffer比HeapBuffer,如上文所述,符合预期
FileChannel force(false)比force(true)好一些
MappedByteBuffer读到数组中有压倒性读取优势,比FileChannel高两个数量级
MappedByteBuffer写入数组数据性能好一些,写入DirectBuffer和HeapBuffer差不多
MappedByteBuffer比FileChannel的写入性能、force性能都好
mac上测试结果差不多,但是MappedByteBuffer从数组、HeapBuffer、DirectBuffer写入差距不像linux上明显

Benchmark                                               Mode  Cnt      Score   Error   Units
MyBenchmark.fileChannelRead2DirectBuffer               thrpt         968.349          ops/ms
MyBenchmark.fileChannelRead2HeapBuffer                 thrpt         669.176          ops/ms
MyBenchmark.fileChannelWriteFromDirectBuffer           thrpt         286.462          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForce      thrpt           3.992          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForceMeta  thrpt           3.169          ops/ms
MyBenchmark.fileChannelWriteFromHeapBuffer             thrpt         258.994          ops/ms
MyBenchmark.mappedRead2Array                           thrpt       67872.596          ops/ms
MyBenchmark.mappedWriteFromArray                       thrpt        1675.428          ops/ms
MyBenchmark.mappedWriteFromDirectBuffer                thrpt        1585.909          ops/ms
MyBenchmark.mappedWriteFromDirectBufferForce           thrpt           6.651          ops/ms
MyBenchmark.mappedWriteFromHeapBuffer                  thrpt        1559.150          ops/ms
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 1, time = 10)
@Threads(16)
@State(Scope.Benchmark)
public class MyBenchmark {
    private static final int SIZE = pageSize;
    private FileChannel fileChannel;
    private MappedByteBuffer mappedByteBuffer;
    private final ThreadLocal<ByteBuffer> fileChannelRead2HeapBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<ByteBuffer> fileChannelRead2DirectBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<byte[]> mappedRead2ArrayThreadLocal = new ThreadLocal<>();
    private final byte[] srcByteArray = new byte[SIZE];
    private final ByteBuffer srcHeapBuffer = ByteBuffer.allocate(SIZE);
    private final ByteBuffer srcDirectBuffer = ByteBuffer.allocateDirect(SIZE);

    @Setup
    public void setup() throws IOException {
        for (int i = 0; i < SIZE; i++) {
            srcByteArray[i] = 9;
        }
        srcHeapBuffer.put(srcByteArray, 0, SIZE);
        srcHeapBuffer.flip();
        srcDirectBuffer.put(srcByteArray, 0, SIZE);
        srcDirectBuffer.flip();
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        fileChannel = r.getChannel();
        mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    }

    @TearDown
    public void tearDown() throws IOException {
        PlatformDependent.freeDirectBuffer(mappedByteBuffer);
        fileChannel.close();
    }

    @Benchmark
    public void fileChannelRead2HeapBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2HeapBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocate(SIZE);
            fileChannelRead2HeapBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelRead2DirectBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2DirectBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocateDirect(SIZE);
            fileChannelRead2DirectBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromHeapBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcHeapBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForce(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(false);
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForceMeta(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(true);
    }

    @Benchmark
    public void mappedRead2Array(Blackhole blackhole) {
        byte[] array = mappedRead2ArrayThreadLocal.get();
        if (array == null) {
            array = new byte[SIZE];
            mappedRead2ArrayThreadLocal.set(array);
        }
        blackhole.consume(mappedByteBuffer.slice().get(array, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromArray(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcByteArray, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromHeapBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcHeapBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBufferForce(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
        mappedByteBuffer.force();
    }
}

进阶部分

java工程中通过JNA调用libc标准库函数

对JNA不做展开介绍,读者可以自行查阅学习:https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;

public interface LibC extends Library {
    LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
    // 是不是很熟悉,大家学的第一个函数应该就是这个吧:)
    void printf(String format, Object... args);
    // 本文的核心,后面详细介绍
    int mlock(Pointer var1, NativeLong var2);
}

通过mlock将锁定物理内存,避免内存被swap,保持物理内存常驻

mlock不做详细展开,读者可以自行查阅,推荐文章如下
http://www.daileinote.com/computer/linux_sys/32
https://www.cnblogs.com/linhaostudy/p/15972330.html

  • 其实mmap方法就可以直接设置flag为MAP_LOCKED将内存锁住,但是jdk没有提供该能力,所以我们需要才需要直接调用mlock锁住内存
  • 用户可以锁住的内存大小受操作系统限制,可以ulimit -l查看,mac和linux一般应该都是无限的unlimited
  • sysctl_max_map_count 规定了进程虚拟内存空间所能包含VmArea的最大个数,可以通过 /proc/sys/vm/max_map_count 内核参数来调整 sysctl_max_map_count,一次mmap对应产生一个VmArea
  • 对mmap的内存执行mlock后就已经触发了缺页将所有物理内存填充好了,也就不需要madvise,也不需要再调用MappedByteBuffer的load方法了,测试代码如下,最后byteBuffer.isLoaded()会显示true
    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        long address = PlatformDependent.directBufferAddress(byteBuffer);
        Pointer p = new Pointer(address);
        System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
        System.out.println(byteBuffer.isLoaded());
}
  • mlock之后无需调用munlock,会随着MappedByteBuffer的释放自动munlock(也符合文档说明,munmap自动munlock),测试代码如下

该测试程序可以永远执行下去,mlock的返回值也一直会是0(系统调用返回值0代表正常)
读者可以试试只调用mlock,但是不释放MappedByteBuffer的情况,小心电脑死机 : )

    public static void main(String[] args) throws IOException, InterruptedException {
        try {
            while (true) {
                for (int i = 1; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    long add = PlatformDependent.directBufferAddress(buffer);
                    Pointer p = new Pointer(add);
                    System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
                    channel.close();
                    PlatformDependent.freeDirectBuffer(buffer);
                }
            }
        } catch (Exception e) {
        }
  • windows环境没有mlock,需要调用kernel32.dll中的VirtualLock系统调用,按照JNA文档需要实现JNA的StdCallLibrary接口

https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

  • windows环境下要更复杂一点,VirtualLock受大小和个数影响。在elasticsearch中有使用到,可以参考源码学习,本文不做展开(感觉在windows上搞这个优化没啥意义)

https://github.com/elastic/elasticsearch/blob/951640b73f71909013f57645cd30e1f19d8c2323/server/src/main/java/org/elasticsearch/bootstrap/JNAKernel32Library.java#L30

获取程序运行机器的操作系统页大小

  • 参考netty中PlatformDependent0中执行Bits类中unaligned方法的过程
    private int pageSize = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
        try {
            Class<?> bitsClass = Class.forName("java.nio.Bits", false, PlatformDependent.getSystemClassLoader());
            Method pageSizeMethod = bitsClass.getDeclaredMethod("pageSize");
            pageSizeMethod.setAccessible(true);
            return (Integer) pageSizeMethod.invoke(null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

总结

  • 在java领域实现高性能流式存储现在看就比较清晰了,固定文件大小,通过MappedByteBuffer配合系统调用mlock,将最新的文件mlock到物理内存中,保证新数据的实时读取
  • 根据调用请求分析将部分热点老文件也mlock住,并做动态调整,避免大量冷读造成读取性能下降
  • RocketMQ中文件预热部分有mlock相关使用,但是为了兼容使用预热和不使用预热两种情况,在预热部分有些冗余调用,可以理解。预热过程既然调用了mlock就无需对每个页写一个字节填充物理页了,也无需调用madvise

相关文章

  • gRPC初探

    资源 [1] gRPC Java Example 关键词 高性能,开源,双向流式,多开发语言支持,Apache 2...

  • spark mapreduce

    spark:1.需要重复读取同样数据进行迭代计算2.流式实时3.内存大,快 内存换存储4.scala,java5....

  • sessionStorage和localStorage的区别及如

    引言:最近在做一个webapp项目,前端需要存储token和user信息,同时需要将user信息存储在vuex中,...

  • 存储 dict 的元素前是计算 key 的 hash 值?

    dict 的高性能与其存储方式是分不开的,我们知道 dict 的存储是基于哈希表(又称散列表),需要计算 hash...

  • 2021-2-19:请问你知道 Java 如何高性能操作文件么?

    一般高性能的涉及到存储框架,例如 RocketMQ,Kafka 这种消息队列,存储日志的时候,都是通过 Java ...

  • JavaStream 常用操作(二)

    一. 流式处理简介在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作变得简洁了许多,通常我们需要...

  • 分布式流式计算-Kafka部署

    Kafka是一个高性能的流式消息队列,适用于大数据场景下的消息传输、消息处理和消息存储。在学习过程中,我们通常使用...

  • Java对象头详解

    由于Java面向对象的思想,在JVM中需要大量存储对象,存储时为了实现一些额外的功能,需要在对象中添加一些标记字段...

  • Java对象头详解

    由于Java面向对象的思想,在JVM中需要大量存储对象,存储时为了实现一些额外的功能,需要在对象中添加一些标记字段...

  • Java对象头详解

    由于Java面向对象的思想,在JVM中需要大量存储对象,存储时为了实现一些额外的功能,需要在对象中添加一些标记字段...

网友评论

      本文标题:做一个高性能的java流式存储项目你需要知道的一些事儿

      本文链接:https://www.haomeiwen.com/subject/ryqwwdtx.html