美文网首页
[HDFS] 文件系统的小文件判定和合并问题

[HDFS] 文件系统的小文件判定和合并问题

作者: LZhan | 来源:发表于2019-10-24 22:24 被阅读0次
    1 判定是否有小文件存在
     public boolean isSmallFileExists(String location) throws IOException {
            Path path = new Path(location);
            if (fs.exists(path) && fs.isDirectory(path)) {
                List<Long> lenList = new LinkedList<>(); // 文件长度
    
                RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);
                while (iterator.hasNext()) {
                    LocatedFileStatus fileStatus = iterator.next();
    
                    if (fileStatus.getPath().getName().startsWith("_")) continue; // 过滤 _SUCCESS 文件
    
                    lenList.add(fileStatus.getLen());
                }
    
                long totalLen = lenList.stream().mapToLong(l -> l).sum(); // 总大小
                int fileNum = lenList.size(); // 文件数
                int preferedFileNum = (int) (totalLen / (getBlockSize() * 0.6) + 1); // 期望文件数
    
                return fileNum > preferedFileNum;
            }
    
            return false;
        }
    
     public long getBlockSize() {
            return fs.getConf().getLong("dfs.blocksize", 32 * 1024 * 1024);
        }
    

    分析:
    <1> 判定当前路径是否存在以及当前路径是目录而不是某具体文件。
    <2> RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path)
    即获取路径下的所有的文件和文件夹。(这里面应该是没有文件夹了,全是文件)
    <3> 遍历文件,过滤_SUCCESS文件(_SUCCESS文件在什么情况下会生成?)
    添加每个文件的文件大小。
    <4> 计算该目录下全部文件大小,统计所有文件数,并计算期望文件数。
    期望文件数的大小:
    总文件大小/(文件系统块大小 * 0.6)+1
    这里默认了存够了文件系统块大小的60%,就不算是小文件了
    <5> 文件数大于期望文件数,代表当前目录是存在小文件的,否则不存在小文件。

    2 合并小文件
    @Component
    public class StorageOptimizationJob implements Runnable {
    
        private static final Logger logger = LoggerFactory.getLogger(StorageOptimizationJob.class);
    
        @Autowired
        private StorageService storageService;
    
    //    @Scheduled(fixedDelay = 1000 * 60 * 5)
        @Override
        public void run() {
            if (prepare()) { // 准备好了么?
                logger.info("准备就绪,执行优化");
                optimize(); // 优化
                logger.info("优化完毕,执行清理");
                cleanup(); // 清理
                logger.info("清理完毕,任务退出");
            } else {
                logger.info("准备未就绪,任务退出");
            }
        }
    
        private boolean prepare() {
            boolean prepare = true;
    
            try (YARN yarn = YARN.get()) {
                prepare = prepare || yarn.isIdle();
            } catch (Exception e) {
                logger.error("调用 YARN 出错", e);
                prepare = false;
            }
    
            return prepare;
        }
    
        private void optimize() {
            List<StorageJobDTO> jobs = this.storageService.queryAllStorageJobs();
            if (jobs == null || jobs.size() == 0) return;
            StorageJobDTO job = jobs.get((new Random()).nextInt(jobs.size())); // 随机获取一个任务
    
            try (HiveMetaStore hiveMetaStore = HiveMetaStore.get();HDFS hdfs = HDFS.get()) {
                if (hiveMetaStore.isTableExists(job.getDb(), job.getTable())) {
                    if (hiveMetaStore.isSetPartition(job.getDb(), job.getTable())) {
                        // 设置了分区
                        List<String> locations = hiveMetaStore.getPartitionLocation(job.getDb(), job.getTable());
                        for (String location: locations) {
                            if (hdfs.isSmallFileExists(location)) {
                                logger.info("{} 存在小文件,执行合并任务", location);
                                new ConcatenateTask(location).run(); // 启动合并任务
                                log(job.getId(), StorageJobLogDO.STATE_SUCCESS, location + " 存在小文件,执行合并任务");
                                break; // 每个周期只合并一个目录
                            }
                        }
                    } else {
                        // 没有设置分区
                        String location = hiveMetaStore.getTableLocation(job.getDb(), job.getTable());
                        if (hdfs.isSmallFileExists(location)) {
                            logger.info("{} 存在小文件,执行合并任务", location);
                            new ConcatenateTask(location).run(); // 启动合并任务
                            log(job.getId(), StorageJobLogDO.STATE_SUCCESS, location + " 存在小文件,执行合并任务");
                        }
                    }
                } else {
                    logger.warn("Hive 表 {}.{} 不存在", job.getDb(), job.getTable());
                    log(job.getId(), StorageJobLogDO.STATE_FAILED, "Hive 表 " + job.getDb() + "." + job.getTable() + " 不存在");
                }
            } catch (Exception e) {
                logger.error("执行优化出错", e);
                log(job.getId(), StorageJobLogDO.STATE_FAILED, e.getLocalizedMessage());
            }
        }
    
        private void cleanup() {}
    
        private void log(Long jobId, String state, String desc) {
            StorageJobLogDO log = new StorageJobLogDO(jobId);
            log.setState(state);
            log.setDesc(desc);
            this.storageService.insertStorageJobLog(log);
        }
    
    }
    
    class ConcatenateTask {
    
        private static final Logger logger = LoggerFactory.getLogger(ConcatenateTask.class);
    
        private String location;
    
        private CountDownLatch cd;
    
        ConcatenateTask(String location) {
            this.location = location;
            this.cd = new CountDownLatch(1);
        }
    
        public void run() throws IOException, InterruptedException {
            String appPath = System.getenv("SPARK_APP_PATH");
            if (appPath == null || appPath.trim().length() == 0) {
                logger.error("没有配置 SPARK_APP_PATH 环境变量");
                throw new RuntimeException("没有配置 SPARK_APP_PATH 环境变量");
            }
    
            logger.info("启动 Spark 任务");
            SparkAppHandle handle = new SparkLauncher()
                    .setMaster("yarn")
                    .setDeployMode("client")
                    .setConf("spark.executor.instances", "4")
                    .setConf("spark.executor.cores", "2")
                    .setConf("spark.executor.memory", "4g")
                    .setMainClass("com.dyingbleed.labrador.spark.Application")
                    .setAppResource(System.getenv("SPARK_APP_PATH"))
                    .addAppArgs(location)
                    .setAppName("labrador")
                    .startApplication();
    
            handle.addListener(new SparkAppHandle.Listener() {
                @Override
                public void stateChanged(SparkAppHandle handle) {
                    logger.info("Spark State: {}", handle.getState().name());
                    if (handle.getState().isFinal()) cd.countDown();
                }
    
                @Override
                public void infoChanged(SparkAppHandle handle) {}
            });
    
            cd.await(10, TimeUnit.MINUTES); // 30 分钟超时
        }
    }
    

    核心代码 Spark任务合并小文件:

    object Application {
    
      private val logger = LoggerFactory.getLogger(classOf[Application])
    
      def main(args: Array[String]): Unit = {
        if (args.length < 1) {
          throw new IllegalArgumentException("请传入至少一个参数")
        }
        val location = args(0)
    
        val spark = SparkSession.builder()
          .appName("labrador")
          .getOrCreate()
        try {
          new Application(spark, location).run()
        } catch {
          case e: Exception => logger.error("执行出错", e)
        } finally {
          spark.close()
        }
      }
    
    }
    
    class Application(spark: SparkSession, location: String) extends Runnable {
    
      override def run(): Unit = {
        val conf = new Configuration()
        val fs = FileSystem.get(conf)
    
        fileFormat(fs, location) match {
          case Some(fileFormat) => {
            Application.logger.info("获取文件格式 {}", fileFormat)
    
            val tmp = s"/tmp/labrador/${new Random().nextInt(Int.MaxValue)}" // 临时目录
            Application.logger.debug("临时目录 {}", tmp)
    
            Application.logger.info("执行小文件合并")
            try {
              spark.read
                .format(fileFormat)
                .load(location)
                .repartition(getPreferedPartitionNum(fs, location))
                .write
                .format(fileFormat)
                .save(tmp)
            } catch {
              case e: Exception => {
                val path = new Path(tmp)
                if (fs.exists(path)) {
                  fs.delete(path, true)
                }
                throw e
              }
            }
    
            Application.logger.info("验证数据")
            if (validate(fileFormat, location, tmp)) {
              Application.logger.info("备份旧数据")
              fs.rename(new Path(location), new Path(location + "_bak"))
              Application.logger.info("拷贝新数据")
              fs.rename(new Path(tmp), new Path(location))
              Application.logger.info("删除旧数据")
              fs.delete(new Path(location + "_bak"), true)
            } else {
              Application.logger.warn("合并后数据不一致")
            }
          }
          case None => Application.logger.warn("无法获取 {} 下文件格式", location)
        }
      }
    
      private[this] def fileFormat(fs: FileSystem, location: String): Option[String] = {
        val files = fs.listFiles(new Path(location), false)
    
        while (files.hasNext) {
          val file = files.next()
          if (file.isFile) {
            val fileName = file.getPath.getName
            val lastIndex = fileName.lastIndexOf(".")
            if (lastIndex > 0 && lastIndex + 1 <= fileName.length) {
              val suffix = fileName.substring(lastIndex + 1, fileName.length)
              if (isValidFileFormat(suffix)) return Option(suffix)
            }
          }
        }
    
        None
      }
    
      private[this] def isValidFileFormat(fileFormat: String): Boolean = {
        fileFormat.equalsIgnoreCase("parquet") || fileFormat.equalsIgnoreCase("json")
      }
    
      private[this] def getPreferedPartitionNum(fs: FileSystem, location: String): Int = {
        val files = fs.listFiles(new Path(location), false)
    
        var totolLen = 0L // 总大小
    
        while (files.hasNext) {
          val file = files.next()
          if (file.isFile) totolLen += file.getLen
        }
    
        (totolLen / fs.getConf.getLong("dfs.blocksize", 32 * 1024 * 1024) + 1).toInt
      }
    
      private[this] def validate(fileFormat: String, sourceDir: String, targetDir: String): Boolean = {
        val sourceCount = spark.read.format(fileFormat).load(sourceDir).count()
        val targetCount = spark.read.format(fileFormat).load(targetDir).count()
        sourceCount == targetCount
      }
    
    }
    

    分析:
    <1> prepare方法:
    在准备优化合并小文件前,先判定yarn是否有足够的资源(即当前可用core数目以及当前可用内存),先去调用yarn,即初始化yarn,new YARN()并加载配置文件,isIdle()方法判定当前资源是否满足要求。
    <2> optimize方法:
    a.先去查看所有的优化任务,并随机获取1个任务
    b.判断hive中是否存在对应库对应表
    c.如果该表进行分区,即获取所有的分区存储路径,判断每个分区下是否存在小文件
    d.在合并小文件的任务中,
    第一步:判断文件格式,是parquet还是json格式的
    第二步:针对读取到的格式利用repartition进行合并,也就是文件数的减少:

    spark.read
    .format(fileFormat)
    .load(location)
    .repartition(getPreferedPartitionNum(fs, location))
    .write
    .format(fileFormat)
    .save(tmp)
    

    e.合并完小文件后,验证旧数据和新数据(此刻新数据保存在临时目录下)是否一致,一致的话,就进行重命名旧数据(_bak),将新数据移动到旧数据目录,删除旧数据(_bak)。

    相关文章

      网友评论

          本文标题:[HDFS] 文件系统的小文件判定和合并问题

          本文链接:https://www.haomeiwen.com/subject/ibfjvctx.html