美文网首页python机器学习爬虫
解决Spark Streaming写入HDFS的小文件问题

解决Spark Streaming写入HDFS的小文件问题

作者: LittleMagic | 来源:发表于2019-05-18 23:31 被阅读337次

    今天仍然处于感冒状态,打开电脑随便写一篇,然后滚回床上休息。

    我们都知道,在HDFS中不宜存储大量的小文件。所谓小文件,就是大小远小于dfs.block.size的文件。如果有大量小文件的话,会浪费block,使元数据增加,挤占宝贵的NameNode内存。另外,大文件能够发挥磁盘顺序读写的优势,小文件会产生很多随机读写,性能下降。

    在我们的数仓体系中,有一部分业务的日志数据来源是RocketMQ。我们编写了Spark Streaming程序作为consumer,将这些日志下沉到以天分区的Hive外部表中,批次间隔(batch duration)为1分钟。久而久之,产生了很多小文件。直觉上讲可以通过增长batch duration来减少输出,但这肯定是下下策。

    实在更不动了,明天继续吧(╯‵□′)╯︵┻━┻


    感觉稍微好了一些,继续写。我们用两种方法合并解决该问题,十分有效,下面简要叙述下。

    利用coalesce()和repartition()算子

    在真正落盘之前,可以对RDD做如下两种操作之一:

    rdd.coalesce(1, true)
    rdd.repartition(1)
    

    Spark Streaming在将结果输出到HDFS时是按分区来的,分区越多,产生的小文件自然也越多。coalesce()算子就用来为RDD重新分区,其源码如下,位于RDD类中。

      def coalesce(numPartitions: Int, shuffle: Boolean = false,
                   partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                  (implicit ord: Ordering[T] = null)
          : RDD[T] = withScope {
        require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
        if (shuffle) {
          /** Distributes elements evenly across output partitions, starting from a random partition. */
          val distributePartition = (index: Int, items: Iterator[T]) => {
            var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
            items.map { t =>
              // Note that the hash code of the key will just be the key itself. The HashPartitioner
              // will mod it with the number of total partitions.
              position = position + 1
              (position, t)
            }
          } : Iterator[(Int, T)]
    
          // include a shuffle step so that our upstream tasks are still distributed
          new CoalescedRDD(
            new ShuffledRDD[Int, T, T](
              mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
              new HashPartitioner(numPartitions)),
            numPartitions,
            partitionCoalescer).values
        } else {
          new CoalescedRDD(this, numPartitions, partitionCoalescer)
        }
      }
    

    该算子主要参数有两个:numPartitions表示目标分区数,shuffle表示重分区过程中是否Shuffle。

    如果shuffle参数为true的话,会从一个随机分区开始,利用HashPartitioner将所有数据重新均匀分布到numPartitions个分区上,返回一个由CoalescedRDD包装的ShuffleRDD,父子RDD之间为宽依赖。如果shuffle参数为false,就直接返回CoalescedRDD,其内部就只是简单地将多个分区的数据flatMap之后合并为一个分区,父子RDD之间为窄依赖。

    由上面的分析可知,若numPartitions大于原分区数,那么shuffle参数一定要设为true才可以。若numPartitions小于原分区数,那么又有两种情况要考虑:

    • 分区数之间的比例不太悬殊。比如原有1000个分区,减少到200个分区,这时可以将shuffle设为false,因为子RDD中的一个分区只对应父RDD的5个分区,压力不大。

    • 分区数之间的比例悬殊。比如原有500个分区,减少到1个分区,就要将shuffle设为true,保证生成CoalescedRDD之前的操作有足够的并行度,防止Executor出现单点问题。这也就是本节开头的做法了。

    repartition()算子是借助coalesce()实现的,就是shuffle参数默认为true的版本。

      def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
      }
    

    这种方法非常简单,只需要一句话就可以使每批次输出只有一个文件。不过它会增加批次处理时长,如果数据量巨大,可能会造成数据堆积,因此需要观察之后再使用。

    利用copyMerge()方法

    Hadoop的FileUtil工具类中提供了copyMerge()方法,它专门用来将一个HDFS目录下的所有文件合并成一个文件并输出,其源码如下。

      public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                      FileSystem dstFS, Path dstFile, 
                                      boolean deleteSource,
                                      Configuration conf, String addString) throws IOException {
        dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
    
        if (!srcFS.getFileStatus(srcDir).isDirectory())
          return false;
       
        OutputStream out = dstFS.create(dstFile);
        
        try {
          FileStatus contents[] = srcFS.listStatus(srcDir);
          Arrays.sort(contents);
          for (int i = 0; i < contents.length; i++) {
            if (contents[i].isFile()) {
              InputStream in = srcFS.open(contents[i].getPath());
              try {
                IOUtils.copyBytes(in, out, conf, false);
                if (addString!=null)
                  out.write(addString.getBytes("UTF-8"));
                    
              } finally {
                in.close();
              } 
            }
          }
        } finally {
          out.close();
        }
        
        if (deleteSource) {
          return srcFS.delete(srcDir, true);
        } else {
          return true;
        }
      }
    

    我们就可以写一个简单的程序,通过调用copyMerge()方法合并Hive外部表对应分区的文件,并且按照分区的时间粒度(天、小时等)调度。源数据的文件夹可以通过参数来指定,并且设置deleteSource参数为true,就能在合并完成后删除原来的小文件。需要注意的是,为了避免将当前正在写入的文件也合并进去,调度需要有一点延时。

    相关文章

      网友评论

        本文标题:解决Spark Streaming写入HDFS的小文件问题

        本文链接:https://www.haomeiwen.com/subject/avatzqtx.html