美文网首页
RandomAccessFile文件锁踩坑--write高并发引

RandomAccessFile文件锁踩坑--write高并发引

作者: 南风nanfeng | 来源:发表于2019-11-26 10:18 被阅读0次

    背景

    多线程写入文件,要考虑线程同步问题,实现数据完整落盘磁盘备份。
    操作系统:
    win10:没问题
    centos7:有问题

        public static void writeFileLock(String content, String filePath) {
            File file = new File(filePath);
            RandomAccessFile raf = null;
            FileChannel fileChannel = null;
            FileLock fileLock = null;
            try {
                raf = new RandomAccessFile(file, "rw");
                fileChannel = raf.getChannel();
                while (true) {
                    try {
                        fileLock = fileChannel.tryLock();
                        if (fileLock != null) {
                            break;
                        }
                    } catch (Exception e) {
                        Thread.sleep(0);
                    }
                }
                raf.seek(raf.length());
                raf.write(content.getBytes());
                fileLock.release();
                fileChannel.close();
                raf.close();
            } catch (Exception e) {
                log.error("写文件异常", e);
                log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
            }
        }
    

    RandomAccessFile建立文件连接符,raf获取文件管道,文件管道获取文件锁,tryLock方法有两个特点:第一、非阻塞,调用后立刻返回;第二、没拿到锁可能返回null,也可以能抛出异常,所以if判断循环获取,异常块捕获异常再重新尝试获取锁,注意Thread.sleep(0)的作用并不是睡0秒,而是马上加入到可执行队列,等待cpu的时间分片。

    这段代码承载线上的kafka多线程备份消息的任务,用lock协调多线程的写入同步,埋点监控发现,备份数据偶发遗漏,大概2.3亿数据,会有5条偏差,就是漏了。

    下面记录压测思路及过程。

    准备

    压测代码:

    private static final ExecutorService FILE_THREADS = Executors.newFixedThreadPool(100);
    
    public void execute(String... strings) throws Exception {
    
            int cnt = 100 * 100 * 100;
            int idx = 1;
            long begin = 1574305200000L;
            while (idx <= cnt) {
                Map<String, Object> map = new HashMap<>();
                map.put("id", idx);
                map.put("time", begin);
                String timeDirectory = DateUtil.getBeforeOneHour("yyyyMMddHHmm", 8, begin);
                String mm = DateUtil.getBeforeOneHour("mm", 0, begin).concat(".txt");
                String json = JsonUtil.getJosnString(map).concat(System.getProperty("line.separator"));
                FILE_THREADS.execute(new PersistThread(timeDirectory, mm , json));
                if (idx % 10000 == 0) {
                    begin += 60000L;
                }
                idx++;
            }
    }
    
    private class PersistThread extends Thread {
    
            String time;
            String filename;
            String content;
    
            PersistThread(String time, String filename, String content) {
                this.time = time;
                this.filename = filename;
                this.content = content;
            }
    
            @Override
            public void run() {
                String folder = "/data/job_project/txt/" + time + "/";
                FileUtil.createDirectory(folder);
                FileUtil.writeFileIO(content, folder + filename);
            }
    }
    

    创建100个线程的线程池,提交写入文件Thread任务,实现多线程写入文件,且文件目录、文件是动态创建的(模拟线上),id每自增1万创建一个时间戳目录,格式是:yyyyMMddHHmm,在目录下创建一个文件,写入1万行数据,相当于100个线程,动态写入100个目录下的100个文件中,每个文件写入1万行。

    首先怀疑创建目录和文件:

    代码如下:

        public static File createDirectory(String path) {
            File file = new File(path);
            if (!file.exists() && !file.isDirectory()) {
                 file.mkdirs();
            }
            return file;
        }
    
        public static File createFile(String file) {
            File f = null;
            try {
                f = new File(file);
                if (!f.exists()) {
                    f.createNewFile();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return f;
        }
    

    创建目录和文件,逻辑都是先检查再创建,显然不是原子的,所以怀疑有没有可能是多线程环境中,目录重复创建导致,所以把代码优化成两次判断的同步方式,如下:

        public static File createDirectory(String path) {
            File file = new File(path);
            if (!file.exists() && !file.isDirectory()) {
                synchronized (FileUtil.class) {
                    if (!file.exists() && !file.isDirectory()) {
                        file.mkdirs();
                    }
                }
            }
            return file;
        }
    
        public static File createFile(String file) {
            File f = null;
            try {
                f = new File(file);
                if (!f.exists()) {
                    synchronized (FileUtil.class) {
                        if (!f.exists()) {
                            f.createNewFile();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return f;
        }
    

    压入100w数据,观察结果,大失所望:

    /data/job_project/txt/201911211100/00.txt lines: 9989
    /data/job_project/txt/201911211101/01.txt lines: 9996
    /data/job_project/txt/201911211102/02.txt lines: 9984
    /data/job_project/txt/201911211103/03.txt lines: 9984
    /data/job_project/txt/201911211104/04.txt lines: 9982
    

    事实是绝大部分文件都漏了,下面把所有的目录和文件全部规划好,再试。
    规划目录脚本:

    #!/bin/sh
    txt=/data/job_project/txt/*
    for folder in $txt;do
        filename=${folder##*/}
        if [[ $filename = "f.sh" ]] || [[ $filename = "search.sh" ]];then
            echo "$filename is a shell file"
        else
            filename=${filename:10}
            filepath=${folder}/${filename}.txt
            #rm -f $filepath
            #touch $filepath
            lines=$(wc -l ${filepath} | awk '{print $1}')
            if [ $lines -ne 10000 ];then
                echo "$filepath lines: $lines"
            fi
        fi
    done
    

    结果仍然会漏数据。

    为了彻底屏蔽创建目录和文件带来的影响,下面的压测前都创建好了文件和目录。

    使用RandomAccessFile的rws方式同步写入文件。

    测试结果:

    /data/job_project/txt/201911211101/01.txt lines: 9998
    /data/job_project/txt/201911211106/06.txt lines: 9999
    /data/job_project/txt/201911211107/07.txt lines: 9999
    /data/job_project/txt/201911211109/09.txt lines: 9999
    /data/job_project/txt/201911211112/12.txt lines: 9999
    /data/job_project/txt/201911211116/16.txt lines: 9998
    /data/job_project/txt/201911211119/19.txt lines: 9999
    /data/job_project/txt/201911211120/20.txt lines: 9998
    ...
    

    压测过程十分缓慢,写入性能非常差,但是结果震惊,仍然漏了,仔细看了官网api注解:

         * <p>The <tt>"rwd"</tt> mode can be used to reduce the number of I/O
         * operations performed.  Using <tt>"rwd"</tt> only requires updates to the
         * file's content to be written to storage; using <tt>"rws"</tt> requires
         * updates to both the file's content and its metadata to be written, which
         * generally requires at least one more low-level I/O operation.
         *
         * <p>If there is a security manager, its {@code checkRead} method is
         * called with the pathname of the {@code file} argument as its
         * argument to see if read access to the file is allowed.  If the mode
         * allows writing, the security manager's {@code checkWrite} method is
         * also called with the path argument to see if write access to the file is
         * allowed.
    

    rwd模式同步文件内容,rws模式同步文件内容和文件元数据,压测首选当然选择更严格的rws,结果仍然遗漏,此时已经开始怀疑jdk源码了。

    调整close顺序,校验lock

    第一处改动:
        if (fileLock != null) {
            break;
        }
    多加一层校验,改成
        if (fileLock != null && fileLock.isValid()) {
            break;
        }
    
    第二处改动:
        fileLock.release();
        fileChannel.close();
        raf.close();
    调整close顺寻,改成:
        fileLock.release();
        raf.close();
        fileChannel.close();
    
    

    测试结果:

    /data/job_project/txt/201911211100/00.txt lines: 9989
    /data/job_project/txt/201911211101/01.txt lines: 9996
    /data/job_project/txt/201911211102/02.txt lines: 9984
    /data/job_project/txt/201911211103/03.txt lines: 9984
    /data/job_project/txt/201911211104/04.txt lines: 9982
    ...
    

    结果显示,反而漏了更多数据,此时已经自闭了,但是还要接着撸。

    使用channel写入缓冲区

    public static void writeFileLock(String content, String filePath, String time) {
            File file = createFile(filePath);
            RandomAccessFile raf = null;
            FileChannel fileChannel = null;
            FileLock fileLock = null;
            try {
                raf = new RandomAccessFile(file, "rw");
                fileChannel = raf.getChannel();
                while (true) {
                    try {
                        fileLock = fileChannel.tryLock();
                        if (fileLock != null && fileLock.isValid()) {
                            break;
                        }
                    } catch (Exception e) {
                        Thread.sleep(0);
                    }
                }
                fileChannel.write(ByteBuffer.wrap(content.getBytes()), fileChannel.size());
                fileLock.release();
                raf.close();
                fileChannel.close();
            } catch (Exception e) {
                log.error("写文件异常", e);
                log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
            }
        }
    
    

    改变写入方式,用nio的管道channel写入数据,结果仍然失望。

    日志埋点——使用redis计数器

    埋点代码:

        public static void writeFileLock(String content, String filePath, String time) {
            File file = createFile(filePath);
            RandomAccessFile raf = null;
            FileChannel fileChannel = null;
            FileLock fileLock = null;
            try {
                redisHelper.incr("filelock0:".concat(time));
                raf = new RandomAccessFile(file, "rw");
                fileChannel = raf.getChannel();
                while (true) {
                    try {
                        fileLock = fileChannel.tryLock();
                        if (fileLock != null && fileLock.isValid()) {
                            break;
                        }
                    } catch (Exception e) {
                        Thread.sleep(0);
                    }
                }
    
                redisHelper.incr("filelock1:".concat(time));
                raf.seek(raf.length());
                redisHelper.incr("filelock2:".concat(time));
                raf.write(content.getBytes());
                redisHelper.incr("filelock3:".concat(time));
                fileLock.release();
                redisHelper.incr("filelock4:".concat(time));
                raf.close();
                redisHelper.incr("filelock5:".concat(time));
                fileChannel.close();
                redisHelper.incr("filelock6:".concat(time));
            } catch (Exception e) {
                log.error("写文件异常", e);
                log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
            }
        }
    

    此时对这段代码彻底失望,得找到数据在哪个位置漏掉的,所以使用了redis计数器,incr是线程安全得,所以能够很快发现到底哪里出问题了,问题马上浮出水面,心中窃喜。
    再说明一下:redis的key包含目录名称,即一个目录一个文件一个key,埋点的密集显示出来必胜的信心。
    结果是所有key的value都是完美的10000,毫无破绽,心如死灰,于是有同事提议,搞个反查,看看RangdomAccessFile的指针到底有没有更新。

    判断RandomAccessFile的文件指针,是不是有没更新指针的情况

        long filelength = raf.length();
        raf.seek(filelength);
        raf.write(content.getBytes());
        if(filelength == raf.length()){
            log.error ( "errorrrrrrrrrrrrr: "+ content);
        }
    

    如果write方法没有写入文件,那么文件指针必然没有更新,调用write后再反查文件指针是否更新,就能判断write是否有写入。结果仍然失望,预期的日志没有打印,说明write确实更新了文件指针,但是就是漏掉了几行数据,结合上述redis计数器埋点和文件指针判断,压测已经走进了死胡同,所有的情况都试过了,至少可以说两点:第一、文件锁没有问题,锁的线程没有逃逸出while循环;第二、测试的每一行代码都执行了到位了,没有哪一行没有执行的。百思不得其解,那就下班,次日再战。

    java.io包+可重入锁的方式

    昨天的压测可以说把所有情况都试过了,还有试过lock阻塞方式,fileChannel方式写入缓冲区,此处不表。今天决定换个思路,拒绝花里胡哨,就用jdk1.0版本的java.io包+ReentrantLock可重入锁的方式写,代码如下:

        public static void writeSyncFile(String content, String filePath) {
            try {
                fileLock.lock();
                File file = createFile(filePath);
                FileWriter fw = new FileWriter(file, true);
                BufferedWriter bw = new BufferedWriter(fw);
                bw.write(content);
                bw.flush();
                fw.close();
                bw.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                fileLock.unlock();
            }
        }
    

    结果可想而知,每个目录的每个文件,都是完美的10000行,且由于使用了缓冲区,文件写入效率大幅提升,具体提升幅度没有严格计算,使用同步块的方式+写入buffer的方式大概2分钟就能写完,而使用上述方式可能要1小时以上,效率杠杠的。普通的文件io方式没有问题,于是同事提议,用FileOutputStream替代RandomAccessFile看看。

    替换RandomAccessFile,使用FileOutputStream获取channel

    决定抛弃RandomAccessFile,使用FileOutputStream获取channel,代码如下:

        public static void writeFileIO(String content, String path) {
            FileLock lock = null;
            try {
                FileChannel channel = new FileOutputStream(path, true).getChannel();
                while (true) {
                    try {
                        lock = channel.lock();
                        if (lock != null && lock.isValid()) {
                            break;
                        }
                    } catch (Exception e) {
                        Thread.sleep(10);
                    }
                }
                channel.write(ByteBuffer.wrap(content.getBytes()));
                lock.release();
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    RandomAccessFile是任意读写的类,而FileOutputStream没有这个功能,要想追加写入文件末尾,在构造方法加个true就行,同样能实现我们想要的功能,第一次压测后,3分钟就出结果,100w数据压入100个文件,每个文件10000行,与预期结果完全相符,完美!乘胜追亚,再压1000w发现数据有误,结果是oom,压入的数据全部写入线程池的阻塞队列中了,于是调大内存到6g,还是如此,奈何机器资源有限,改压400w,结果数据与预期完全符合,此时水落石出,没有想到坑在RandomAccessFile这里,回过头来看这个类,虽然这个类的注释已经被看烂了,比较诡异的是jdk1.0就出的,但是作者未知,可能怕被人喷,嘿嘿嘿。

    总结

    1、代码不是复制粘特,光搜索谷歌百度,往往很多噪音。
    2、高并发场景要多次严格压测,保证数据质量。
    3、千万区分windows系统和linux系统,二者的文件系统完全不同,上述代码在windows完全没问题,但是linux就是状况百出。
    4、怀疑精神,代码都是人写的,就会有bug,测试用例覆盖所有场景,测试各种可能性。

    相关文章

      网友评论

          本文标题:RandomAccessFile文件锁踩坑--write高并发引

          本文链接:https://www.haomeiwen.com/subject/ovdfwctx.html