美文网首页
orc小文件合并趣谈

orc小文件合并趣谈

作者: 艾伦_alan | 来源:发表于2020-04-17 23:54 被阅读0次

    前言

    这周做了个事情趁热沉淀一下。问题很明确治理小文件。问题由来,要追溯到去年,集群治理了。之前做到存储计算的管理,后续做了简单hdfs画像(其中,就有小文件趋势监控)。最近,集群中namenode压力有所显现。于是,针对小文件多的目录进行了排查和治理。进而,有了今天的这个主题ORC小文件合并趣谈

    核心问题

    这里,首先治理的是实时导入数据的目录。这里增量数据采用SparkSQL以动态分区增量写入的方式。众所周知,spark在处理时,每个task都会写入一个文件(如果task处理的数据,包含n个分区的数据,就会产生n个文件)。进而,在并行度高的情况下,导致对应增量分区文件很多(存储并不大)。

    存储治理中,平台统一要求将hive表的格式向orc格式靠拢。orc的表在存储和查询上都有很好的提升。所以,这个问题就间接的转化为解决orc小文件问题

    解决问题

    解决问题,就先从根源入手,即sparksql小文件产生源头。在spark 2.4 版本中提供了hit的方式(https://issues.apache.org/jira/browse/SPARK-24940)。

    处理上,采用程序升级和定时合并的方式。本文,主要介绍如何定时合并orc文件。

    措施

    方案对比

    经过分析,总结了两种方式。

    • 使用ORC原生DDL方式合并小文件功能。

    ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE;

    附:hive-ddl

    优点:

    1. 原生支持,开发量小。
    2. 避免了数据的解压、解码过程。

    缺点:

    1. 不够优雅,无法指定最终合并的文件数,需要多次执行。
    2. 产生一个hivesql处理,中间过程分区目录会产生.staging-hive*的文件。
    3. 比较耗时。
    • 重新造轮子,实现文件合并功能。

    优点:

    1. 省时间,直接操作hdfs,省去了hive处理过程。
    2. 可以控制最终文件数和大小。

    缺点:

    1. 需要一定的开发量。
    2. 合并后,hive元数据需要主动去刷新处理(直接操作hdfs文件,无法同步到hive元数据)这点很重要

    实现

    流程图

    image.png
    1. 主线程main从元数据库MetaStore获取需要合并处理(文件数大于1)的分区信息。
    2. 根据分工不同,使用两个线程池完成异步处理。
    3. mergeHdfsThreadPool管理合并orc格式的hdfs线程。
    4. flushMetastoreThreadPool管理统计合并分区的元数据信息线程,回馈到元数据库MetaStore中。

    核心代码实现

    ORC合并

    这里,参照官网Using Core Java。方式,实现简单的文件合并处理。

        /**
         * 合并orc文件
         * @param fileDir 需要合并的分区目录
         * @throws Exception
         */
        public static void orcFileRollUp(String fileDir) throws Exception {
            if (StringUtils.isBlank(fileDir)) {
                throw new Exception("fileDir is null");
            }
    
            fileDir = fileDir.replace(HDFS_HOST,"");
    
            Path srcPath = new Path(fileDir);
            if (!fs.exists(srcPath)) {
                throw new Exception("fileDir is not exists");
            }
            if (!fs.isDirectory(srcPath)) {
                throw new Exception("fileDir is not directory");
            }
    
            FileStatus[] files = fs.listStatus(srcPath);
            try {
                TypeDescription schema = getSchema(files);
                if (schema != null) {
                    //删除merge.working临时目录
                    String outFile = fileDir + File.separator + MARGE_FILE_NAME;
                    Path outMergeFilePath=null;
                    try {
                        outMergeFilePath = new Path(outFile);
                        if (fs.exists(outMergeFilePath)) {
                            fs.delete(outMergeFilePath, false);
                        }
                    } catch (FileNotFoundException e) {
    
                    }
    
                    Writer writer = OrcFile.createWriter(outMergeFilePath, OrcFile.writerOptions(fs.getConf()).setSchema(schema));
                    List<Path> delSrcPathList = new ArrayList<>();
                    for (FileStatus file : files) {
                        String filePath = file.getPath().toString();
                        Path srcPathTmp = new Path(filePath);
                        Reader reader = OrcFile.createReader(srcPathTmp, OrcFile.readerOptions(fs.getConf()));
                        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
                        RecordReader rows = reader.rows();
                        while (rows.nextBatch(batch)) {
                            if (batch != null) {
                                writer.addRowBatch(batch);
                            }
                        }
                        rows.close();
                        delSrcPathList.add(srcPathTmp);
                    }
                    writer.close();
    
                    //处理合并文件
                    outFile = fs.getFileStatus(outMergeFilePath).getPath().getName();
                    if (outFile.endsWith(".working")) {
                        int lastIndexOf = outFile.lastIndexOf(".working");
                        outFile = outFile.substring(0, lastIndexOf);
                    }
                    Path parent = outMergeFilePath.getParent();
                    Path newPath = null;
                    //移除上一次merge文件
                    try {
                        newPath = new Path(parent, outFile);
                        Path oldMergeFile = new Path(fileDir + File.separator + outFile);
                        if (fs.exists(oldMergeFile)) {
                            fs.delete(oldMergeFile,false);
                        }
                        fs.rename(outMergeFilePath, newPath);
                    } catch (FileNotFoundException e) {
    
                    }
    
                    //删除srcPath
                    for (Path path : delSrcPathList) {
                        if (path.getName().endsWith("merge")) {
                            continue;
                        }
                        fs.delete(path, false);
                    }
                    LOGGER.info("合并分区{}成功,合并文件={}", fileDir,newPath);
                }
            } catch (Exception e) {
                LOGGER.error("合并分区{}失败={}", fileDir,ExceptionUtils.getFullStackTrace(e));
                throw new Exception(ExceptionUtils.getFullStackTrace(e));
            }
        }
    

    文件合并会先写到.merge.working文件中,合并完成后,再将.merge.working重命名为正式文件.merge结尾。最后,将之前的小文件删除。

    重新统计分区元数据

    采用hive原生的统计方式。StatsDev

    image.png
    其他注意点
    1. flushMetastoreThreadPool要在mergeHdfsThreadPool内的线程结束后执行。如何知道一个线程池内所有线程执行完毕?
      线程池的isTerminated,当所有线程都关闭时,会返回true
    while(true){
            if(mergeHdfsThreadPool.isTerminated()){
                    //转交flushMetastoreThreadPool执行
                    break;
                }
        }
    

    结束

    寥寥数笔,欢迎交流。

    相关文章

      网友评论

          本文标题:orc小文件合并趣谈

          本文链接:https://www.haomeiwen.com/subject/vmppvhtx.html