前言
spark 读写hive 比较多,发现总是有小文件很多,几百个几十KB的文件,这样对于后续处理很不友好,这里统一说一下。
前置条件 我准备了一张50W数据的一张hive表,原始数据占用6个小文件。
[hadoop@hebing4 ~]$ hadoop fs -ls hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet
Found 6 items
-rwxr-xr-x 1 hadoop hadoop 4870740 2020-07-16 23:02 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00000-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x 1 hadoop hadoop 4948819 2020-07-16 23:02 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00001-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x 1 hadoop hadoop 4953365 2020-07-16 23:02 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00002-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x 1 hadoop hadoop 4921963 2020-07-16 23:03 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00003-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x 1 hadoop hadoop 4906754 2020-07-16 23:03 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00004-c989f570-74a2-48b1-986b-3043f27447c6-c000
-rwxr-xr-x 1 hadoop hadoop 3810249 2020-07-16 23:03 hdfs://behhb001/hive/warehouse/hebing.db/step_step_23953295763_parquet/part-00005-c989f570-74a2-48b1-986b-3043f27447c6-c000
有shuffle的情况
1、spark.sql.shuffle.partitions
spark.sql.shuffle.partitions (默认值200)官方解释
Configures the number of partitions to use when shuffling data for joins or aggregations.
大意为配置在join和聚合时使用的shuffle的分区数
所以在执行sql有shuffle的情况可以使用这个参数来制定shuffle后的数据的并行度
比如我现在将上面准备的表对某个字段进行group然后生程一张新表, 如果spark.sql.shuffle.partitions 为默认的话,新表将会有200个小文件。但是设置为一个1的话,只会生成一个文件。
这里注意: 1、如果sql语句中没有shuffle,比如执行 create table xxx as select * from step_step_23953295763_parquet 这种形式,上述参数将不会起作用。
2、在sparkThriftserver中配置将不会起作用
2、Adaptive Execution自动设置shufffle partition的数量
这一块官网没有详细介绍,翻遍了这有告诉你怎么去查看 sql相关的配置~
这一块参考:http://www.jasongj.com/spark/adaptive_execution/ 这篇文章的详细解决,以及https://cloud.tencent.com/developer/article/1463892 这篇文章的总体概括,可以使这一块变得明晰起来。 这一块不详细解释了,只说一下它的用法,它有如下两个配置 (在sparkThriftserver中也生效)
1、spark.sql.adaptive.enabled=true 启动Adaptive Execution。 (默认false)
2、spark.sql.adaptive.shuffle.targetPostShuffleInputSize 默认67108864b(64M) 合并的分区的最大阀值
没有shuffle的情况
当数据没有shuffle的情况,我们这边可以使用的方法有限,目前想到的只能是在代码端通过repartition的方法进行重分区然后再write的方式来减少分区。
比如我将上面准备的表写入一张新表,如果通过create table xxx as select,没有shuffle操作,便可以通过如下代码来进行
session.sql("select msisdn from hebing.step_step_23953295763_parquet group by msisdn")
.write.mode(SaveMode.Overwrite).saveAsTable("ceshi.yangli")
这样就可以控制并行度,解决生成多个小文件的问题了。
网友评论