美文网首页大数据工具包
Spark小文件异步合并工具类

Spark小文件异步合并工具类

作者: 0o青团o0 | 来源:发表于2019-12-11 14:17 被阅读0次

    简介

    由于Spark应用写数据到Hive表时,容易因为shuffle数过多导致生成过多小文件,影响集群存储利用率;故需要一个能避免读写冲突的小文件合并工具。

    工具类

    object CombineSmallFiles {
      /**
        * Step 1:读取指定目录下需要合并文件的目录和目录下的文件名,修改文件名后缀为.old:相当于记录为需要合并
        */
      def searchFiles(spark: SparkSession, hdfs: FileSystem, srcStr: String, fileType: String): Unit = {
        val srcPath = new Path(srcStr)
        val fileStatus = hdfs.listStatus(srcPath)
        var smallFileCount: Int = 0
    
        fileStatus.foreach(file => {
          // 对大小小于128M的文件进行标记
          if (!file.isDirectory && !file.getPath.getName.startsWith(".")
            && (hdfs.getContentSummary(file.getPath).getLength < 134217728)) {
            if (!file.getPath.getName.endsWith(".old")) {
              hdfs.rename(file.getPath, new Path(file.getPath + ".old"))
            }
            smallFileCount += 1
          } else if (file.isDirectory && !file.getPath.getName.startsWith(".")) {
            searchFiles(spark, hdfs, file.getPath.toUri.getPath, fileType)
          }
        })
        if (smallFileCount > 1) {
          combineSmallFile(spark, hdfs, srcPath, fileType)
        }
      }
    
    
      /**
        * 合并
        */
      def combineSmallFile(spark: SparkSession, hdfs: FileSystem, srcPath: Path, fileType: String): Unit = {
        val srcStr = srcPath.toUri.getPath
        val combineStr = srcStr + "/.combine"
    
        //如果因为程序中断导致combine遗留合并后的文件,则移动后清除
        moveCombineFileAndRemove(hdfs, srcStr, combineStr)
    
        // Step 2:获取目录下.old的文件,读取写入临时目录并生成文件.combine
        spark
          .read
          .format(fileType)
          .load(srcStr + "/*.old")
          .repartition(1)
          .write
          .format(fileType)
          .save(combineStr)
    
        // Step3:删除.old -> mv .combine下文件到源目录 -> 删除.combine
        hdfs.listStatus(srcPath).foreach(file => {
          // 对.old结尾的文件清除
          if (!file.isDirectory && file.getPath.getName.endsWith(".old")) {
            hdfs.delete(file.getPath, true)
          }
        })
        moveCombineFileAndRemove(hdfs, srcStr, combineStr)
      }
    
      /**
        * 移动合并文件并清除合并用的临时目录
        *
        * @param hdfs
        * @param srcStr
        * @param combineStr
        */
      def moveCombineFileAndRemove(hdfs: FileSystem, srcStr: String, combineStr: String): Unit = {
        val combinePath = new Path(combineStr)
        if (!hdfs.exists(combinePath)) return
        hdfs.listStatus(combinePath).foreach(combineFile => {
          if (combineFile.getPath.getName.startsWith("part-")) {
            hdfs.rename(combineFile.getPath, new Path(srcStr + "/" + combineFile.getPath.getName + ".combine"))
            hdfs.deleteOnExit(combineFile.getPath)
          }
        })
        hdfs.delete(new Path(combineStr), true)
      }
    }
    

    调用

    /**
      * 运维程序:小文件合并
      */
    object BT_OPS_CombineSmallFiles {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("BT_OPS_CombineSmallFiles")
          .config("spark.sql.adaptive.enabled", "true")
          .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "134217728")
          .enableHiveSupport()
          .getOrCreate()
        val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
        val srcPath = "你的HDFS路径"
        searchFiles(spark, hdfs,srcPath, "orc")
        spark.stop
      }
    }
    

    TBC:可通过读取MySQL配置表来指定需要合并的目录、文件类型,方便随时修改。

    相关文章

      网友评论

        本文标题:Spark小文件异步合并工具类

        本文链接:https://www.haomeiwen.com/subject/gqvogctx.html