美文网首页大数据玩转大数据spark
如何避免Spark SQL做数据导入时产生大量小文件

如何避免Spark SQL做数据导入时产生大量小文件

作者: Kent_Yao | 来源:发表于2019-02-01 15:44 被阅读250次

    什么是小文件?

    生产上,我们往往将Spark SQL作为Hive的替代方案,来获得SQL on Hadoop更出色的性能。因此,本文所讲的是指存储于HDFS中小文件,即指文件的大小远小于HDFS上块(dfs.block.size)大小的文件。

    小文件问题的影响

    • 一方面,大量的小文件会给Hadoop集群的扩展性和性能带来严重的影响。NameNode在内存中维护整个文件系统的元数据镜像,用户HDFS的管理;其中每个HDFS文件元信息(位置,大小,分块等)对象约占150字节,如果小文件过多,会占用大量内存,直接影响NameNode的性能。相对的,HDFS读写小文件也会更加耗时,因为每次都需要从NameNode获取元信息,并与对应的DataNode建立连接。如果NameNode在宕机中恢复,也需要更多的时间从元数据文件中加载。

    • 另一方面,也会给Spark SQL等查询引擎造成查询性能的损耗,大量的数据分片信息以及对应产生的Task元信息也会给Spark Driver的内存造成压力,带来单点问题。此外,入库操作最后的commit job操作,在Spark Driver端单点做,很容易出现单点的性能问题。

    Spark小文件产生的过程

    1. 数据源本身就是就含大量小文件

    2. 动态分区插入数据,没有Shuffle的情况下,输入端有多少个逻辑分片,对应的HadoopRDD就会产生多少个HadoopPartition,每个Partition对应于Spark作业的Task(个数为M),分区数为N。最好的情况就是(M=N) && (M中的数据也是根据N来预先打散的),那就刚好写N个文件;最差的情况下,每个Task中都有各个分区的记录,那文件数最终文件数将达到M * N个。这种情况下是极易产生小文件的。

    比如我们拿TPCDS测试集中的store_sales进行举例, sql如下所示

    use tpcds_1t_parquet;
    
    INSERT overwrite table store_sales partition 
           ( 
                  ss_sold_date_sk 
           ) 
    SELECT ss_sold_time_sk, 
           ss_item_sk, 
           ss_customer_sk, 
           ss_cdemo_sk, 
           ss_hdemo_sk, 
           ss_addr_sk, 
           ss_store_sk, 
           ss_promo_sk, 
           ss_ticket_number, 
           ss_quantity, 
           ss_wholesale_cost, 
           ss_list_price, 
           ss_sales_price, 
           ss_ext_discount_amt, 
           ss_ext_sales_price, 
           ss_ext_wholesale_cost, 
           ss_ext_list_price, 
           ss_ext_tax, 
           ss_coupon_amt, 
           ss_net_paid, 
           ss_net_paid_inc_tax, 
           ss_net_profit, 
           ss_sold_date_sk 
    FROM   tpcds_1t_ext.et_store_sales;
    

    首先我们得到其执行计划,如下所示,

    == Physical Plan ==
    InsertIntoHiveTable MetastoreRelation tpcds_1t_parquet, store_sales, Map(ss_sold_date_sk -> None), true, false
    +- HiveTableScan [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L], MetastoreRelation tpcds_1t_ext, et_store_sales
    

    store_sales的原生文件包含1616逻辑分片,对应生成1616 个Spark Task,插入动态分区表之后生成1824个数据分区加一个NULL值的分区,每个分区下都有可能生成1616个文件,这种情况下,最终的文件数量极有可能达到2949200。1T的测试集store_sales也就大概300g,这种情况每个文件可能就零点几M。

    1. 动态分区插入数据,有Shuffle的情况下,上面的M值就变成了spark.sql.shuffle.partitions(默认值200)这个参数值,文件数的算法和范围和2中基本一致。

    比如,为了防止Shuffle阶段的数据倾斜我们可以在上面的sql中加上 distribute by rand(),这样我们的执行计划就变成了,

    InsertIntoHiveTable MetastoreRelation tpcds_1t_parquet, store_sales, Map(ss_sold_date_sk -> None), true, false
    +- *Project [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L]
       +- Exchange(coordinator id: 1080882047) hashpartitioning(_nondeterministic#49, 2048), coordinator[target post-shuffle partition size: 67108864]
          +- *Project [ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25, ss_sold_date_sk#3L, rand(4184439864130379921) AS _nondeterministic#49]
             +- HiveTableScan [ss_sold_date_sk#3L, ss_sold_time_sk#4L, ss_item_sk#5L, ss_customer_sk#6L, ss_cdemo_sk#7L, ss_hdemo_sk#8L, ss_addr_sk#9L, ss_store_sk#10L, ss_promo_sk#11L, ss_ticket_number#12L, ss_quantity#13, ss_wholesale_cost#14, ss_list_price#15, ss_sales_price#16, ss_ext_discount_amt#17, ss_ext_sales_price#18, ss_ext_wholesale_cost#19, ss_ext_list_price#20, ss_ext_tax#21, ss_coupon_amt#22, ss_net_paid#23, ss_net_paid_inc_tax#24, ss_net_profit#25], MetastoreRelation tpcds_1t_ext, et_store_sales
    

    这种情况下,这样我们的文件数妥妥的就是spark.sql.shuffle.partitions * N,因为rand函数一般会把数据打散的非常均匀。当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

    最理想的情况,当然是根据分区字段进行shuffle,在上面的sql中加上distribute by ss_sold_date_sk。 把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产生N个文件,在我们的case中store_sales,在1825个分区下各种生成了一个数据文件。
    但是这种情况下也容易出现数据倾斜的问题,比如双11的销售数据就很容易在这种情况下发生倾斜。

    基于分区字段Shuffle可能出现数据倾斜
    如上图所示,在我们插入store_sales时,就发生了null值的倾斜,大大的拖慢的数据入库的时间。

    如何解决Spark SQL产生小文件问题

    前面已经提到根据分区字段进行分区,除非每个分区下本身的数据较少,分区字段选择不合理,那么小文件问题基本上就不存在了,但是也有可能由于shuffle引入新的数据倾斜问题。

    我们首先可以尝试是否可以将两者结合使用, 在之前的sql上加上distribute by ss_sold_date_sk,cast(rand() * 5 as int), 这个类似于我们处理数据倾斜问题时候给字段加上后缀的形式。如,

    
    use tpcds_1t_parquet;
    
    INSERT overwrite table store_sales partition 
           ( 
                  ss_sold_date_sk 
           ) 
    SELECT ss_sold_time_sk, 
           ss_item_sk, 
           ss_customer_sk, 
           ss_cdemo_sk, 
           ss_hdemo_sk, 
           ss_addr_sk, 
           ss_store_sk, 
           ss_promo_sk, 
           ss_ticket_number, 
           ss_quantity, 
           ss_wholesale_cost, 
           ss_list_price, 
           ss_sales_price, 
           ss_ext_discount_amt, 
           ss_ext_sales_price, 
           ss_ext_wholesale_cost, 
           ss_ext_list_price, 
           ss_ext_tax, 
           ss_coupon_amt, 
           ss_net_paid, 
           ss_net_paid_inc_tax, 
           ss_net_profit, 
           ss_sold_date_sk 
    FROM   tpcds_1t_ext.et_store_sales
    distribute by ss_sold_date_sk, cast(rand() * 5 as int);
    

    按照之前的推算,每个分区下将产生5个文件,同时null值倾斜部分的数据也被打散成五份进行计算,缓解了数据倾斜的问题 ,我们最终将得到1825 *5=9105个文件,如下所示

            1825         9105       247111074494 /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales
    

    如果我们将5改得更小,文件数也会越少,但相应的倾斜key的计算时间也会上去。

    在我们知道那个分区键倾斜的情况下,我们也可以将入库的SQL拆成几个部分,比如我们store_sales是因为null值倾斜,我们就可以通过where ss_sold_date_sk is not nullwhere ss_sold_date_sk is null 将原始数据分成两个部分。前者可以基于分区字段进行分区,如distribute by ss_sold_date_sk;后者可以基于随机值进行分区,distribute by cast(rand() * 5 as int), 这样可以静态的将null值部分分成五个文件。

    FROM   tpcds_1t_ext.et_store_sales 
    where ss_sold_date_sk is not null
    distribute by ss_sold_date_sk;
    
    FROM   tpcds_1t_ext.et_store_sales 
    where ss_sold_date_sk is null
    distribute by distribute by cast(rand() * 5 as int);
    

    对于倾斜部分的数据,我们可以开启Spark SQL的自适应功能,spark.sql.adaptive.enabled=true来动态调整每个相当于Spark的reduce端task处理的数据量,这样我们就不需要认为的感知随机值的规模了,我们可以直接

    FROM   tpcds_1t_ext.et_store_sales 
    where ss_sold_date_sk is null
    distribute by distribute by rand() ;
    

    然后Spark在Shuffle 阶段会自动的帮我们将数据尽量的合并成spark.sql.adaptive.shuffle.targetPostShuffleInputSize(默认64m)的大小,以减少输出端写文件线程的总量,最后减少个数。
    对于spark.sql.adaptive.shuffle.targetPostShuffleInputSize参数而言,我们也可以设置成为dfs.block.size的大小,这样可以做到和块对齐,文件大小可以设置的最为合理。

    Spark SQL 小文件实验

    在我们的猛犸大数据平台上面,随便的建立几个SQL作业,不用会Spark也可以用SQL把大数据玩得666!

    猛犸- 数据开发
    从左到右依次为
    1. 建表 - 按分区字段插入非空集合到分区表 - 按rand插入空集到分区表,并开启自Spark SQL适应
    2. 建表 - 不shuffle 按原始分片直接插入分区表
    3. 建表 - 全集按照分区字段插入到分区表
    4. 建表 - 全局按分区字段+cast(rand() * 5 as int)方式插入分区表

    双击每个工作节点,我们也可以对我们的SQL作业进行参数的调整

    猛犸 - 参数配置

    选中我们对应的实验组,点击执行后,可以查看任务的运行状态。


    猛犸 - 运行状态

    从各组的实验结果来看

    bin/hadoop fs -count /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/
    1825         1863       192985051585 /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales
    
    bin/hadoop fs -du -h /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00000
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00001
    183.3 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00002
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00003
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00004
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00005
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00006
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00007
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00008
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00009
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00010
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00011
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00012
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00013
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00014
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00015
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00016
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00017
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00018
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00019
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00020
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00021
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00022
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00023
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00024
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00025
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00026
    182.9 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00027
    183.3 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00028
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00029
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00030
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00031
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00032
    183.2 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00033
    182.9 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00034
    183.1 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00035
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00036
    183.3 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00037
    183.0 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00038
    70.5 M   /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/part-00039
    
    bin/hadoop fs -du -h /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=2452642
    194.5 M  /user/kyuubi/hive_db/tpcds_1t_parquet.db/store_sales/ss_sold_date_sk=2452642/part-00369
    

    实验组一的小文件控制还是可喜可贺的。对于我们1t的tpcds测试数据,null值分区字段下只有40个文件,其他每个数据分区也只有一个数据文件,总目录1825,总文件数1863. 在解决数据倾斜问题的基础上,也只比纯按照分区字段进行distibute by多了39个文件。

    总结

    本文讲述的是如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。

    1. 对于原始数据进行按照分区字段进行shuffle,可以规避小文件问题。但有可能引入数据倾斜的问题;
    2. 可以通过distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件数量和倾斜度之间做权衡
    3. 知道倾斜键的情况下,可以将原始数据分成几个部分处理,不倾斜的按照分区键shuffle,倾斜部分可以按照rand函数来shuffle
    4. 活用Spark SQL自适应功能,目前Spark 的各版本的Release中其实也就两个参数,设spark.sql.adaptive.enabled=true即可开启该功能,spark.sql.adaptive.shuffle.targetPostShuffleInputSize设置reduce任务处理文件的上限,配合结论3使用,解决小文件问题事半功倍。
    5. 对于Spark 2.4的用户,也可以使用HINT 详情请看 https://issues.apache.org/jira/browse/SPARK-24940
    6. 猛犸大数据平台是一站式大数据管理和应用开发平台,具有敏捷易用,成熟稳定,安全可靠,开放灵活的特点,提供7*24小时专业服务。

    相关文章

      网友评论

        本文标题:如何避免Spark SQL做数据导入时产生大量小文件

        本文链接:https://www.haomeiwen.com/subject/iyrcsqtx.html