美文网首页
spark 离线任务对于小文件的处理

spark 离线任务对于小文件的处理

作者: 早点起床晒太阳 | 来源:发表于2020-08-07 19:15 被阅读0次

前言

spark 读写hive 比较多,发现总是有小文件很多,几百个几十KB的文件,这样对于后续处理很不友好,这里统一说一下。
前置条件 我准备了一张50W数据的一张hive表,原始数据占用6个小文件。

[hadoop@hebing4 ~]$ hadoop fs -ls hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet
Found 6 items
-rwxr-xr-x   1 hadoop hadoop    4870740 2020-07-16 23:02 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00000-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x   1 hadoop hadoop    4948819 2020-07-16 23:02 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00001-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x   1 hadoop hadoop    4953365 2020-07-16 23:02 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00002-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x   1 hadoop hadoop    4921963 2020-07-16 23:03 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00003-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x   1 hadoop hadoop    4906754 2020-07-16 23:03 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00004-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x   1 hadoop hadoop    3810249 2020-07-16 23:03 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00005-c989f570-74a2-48b1-986b-3043f27447c6-c000

有shuffle的情况

1、spark.sql.shuffle.partitions

spark.sql.shuffle.partitions (默认值200)官方解释

Configures the number of partitions to use when shuffling data for joins or aggregations.

大意为配置在join和聚合时使用的shuffle的分区数
所以在执行sql有shuffle的情况可以使用这个参数来制定shuffle后的数据的并行度

比如我现在将上面准备的表对某个字段进行group然后生程一张新表, 如果spark.sql.shuffle.partitions 为默认的话,新表将会有200个小文件。但是设置为一个1的话,只会生成一个文件。

这里注意: 1、如果sql语句中没有shuffle,比如执行 create table xxx as select * from step_step_23953295763_parquet 这种形式,上述参数将不会起作用。
2、在sparkThriftserver中配置将不会起作用

2、Adaptive Execution自动设置shufffle partition的数量

这一块官网没有详细介绍,翻遍了这有告诉你怎么去查看 sql相关的配置~
这一块参考:http://www.jasongj.com/spark/adaptive_execution/ 这篇文章的详细解决,以及https://cloud.tencent.com/developer/article/1463892 这篇文章的总体概括,可以使这一块变得明晰起来。 这一块不详细解释了,只说一下它的用法,它有如下两个配置 (在sparkThriftserver中也生效)

1、spark.sql.adaptive.enabled=true 启动Adaptive Execution。 (默认false)
2、spark.sql.adaptive.shuffle.targetPostShuffleInputSize 默认67108864b(64M) 合并的分区的最大阀值

没有shuffle的情况

当数据没有shuffle的情况,我们这边可以使用的方法有限,目前想到的只能是在代码端通过repartition的方法进行重分区然后再write的方式来减少分区。
比如我将上面准备的表写入一张新表,如果通过create table xxx as select,没有shuffle操作,便可以通过如下代码来进行

session.sql("select msisdn from hebing.step_step_23953295763_parquet group by msisdn")
.write.mode(SaveMode.Overwrite).saveAsTable("ceshi.yangli")

这样就可以控制并行度,解决生成多个小文件的问题了。

相关文章

网友评论

      本文标题:spark 离线任务对于小文件的处理

      本文链接:https://www.haomeiwen.com/subject/cbdqdktx.html