摘要:Spark SQL
,Hive
问题描述
使用Spark SQL将DataFrame调用其API overwrite写入Hive,如果存在多个任务同时往一张hive表overwrite,那么会导致只有其中一个任务成功,另外其他的任务都失败的问题,并且写入的结果存在同一张表有重复的行
问题复现
测试数据采用一个79万多的二维表,从hdfs读取写入hive
scala> val df = spark.read.format("csv").load("hdfs:///tmp/churn_train.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 36 more fields]
scala> df.write.format("parquet").mode("overwrite").saveAsTable("test_gp.churn_train")
hive> select count(*) from churn_train;
Total MapReduce CPU Time Spent: 11 seconds 370 msec
OK
790609
此时开启两个Spark Shell Master进程,将上面的代码同时执行,先启动的Spark Shell写入失败报错,另一个执行成功,在查看Hive发现并没有起到覆盖的效果,这个表的数据量增长到158万正好是原来的2倍
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hive/warehouse/test_gp.db/churn_train/_temporary/0/_temporary/attempt_20210507105116_0003_m_000000_0/part-00000-3dcbc713-0662-413b-b9a1-670c00e38c77-c000.snappy.parquet (inode 16321371): File does not exist. Holder DFSClient_attempt_20210507105116_0003_m_000000_0_-1967623668_65 does not have any open files.
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or view 'churn_train' already exists in database 'test_gp';
hive> select count(*) from churn_train;
Total MapReduce CPU Time Spent: 15 seconds 110 msec
OK
1581218
此时如果单独执行一次其中任何一个进程,覆盖重新生效数据量回到79万
如果不使用DataFrame API,采用Spark SQL语句也是同样的报错
scala> df.createOrReplaceTempView("df")
scala> spark.sql("insert overwrite table test_gp.churn_train select * from df")
从hdfs上可以看到也存在重复的的数据文件,有两组文件,每组大小是一样的
[root@cloudera01 ~]# hdfs dfs -ls /user/hive/warehouse/test_gp.db/churn_train
Found 5 items
-rw-r--r-- 3 hdfs hive 0 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/_SUCCESS
-rw-r--r-- 3 hdfs hive 7396932 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00000-1f4875ad-fe90-4a03-9198-3318cb77f1ce-c000.snappy.parquet
-rw-r--r-- 3 hdfs hive 7396932 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00000-e4382cd2-7bd6-4343-9611-70f451403454-c000.snappy.parquet
-rw-r--r-- 3 hdfs hive 6912605 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00001-1f4875ad-fe90-4a03-9198-3318cb77f1ce-c000.snappy.parquet
-rw-r--r-- 3 hdfs hive 6912605 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00001-e4382cd2-7bd6-4343-9611-70f451403454-c000.snappy.parquet
原因分析
参考这篇文章https://blog.csdn.net/u013332124/article/details/109359773
Spark SQL在执行SQL的overwrite的时候并没有删除旧的的数据文件(Spark SQL生成的数据文件),Spark SQL写入Hive的流程如下

具体几点如下
(1)Spark写入Hive会先生成一个临时的_temporary目录用于存储生成的数据文件,全部生成完毕后全部移动到输出目录,然后删除_temporary目录,最后创建Hive元数据
(2)多个Spark写入数据任务使用了同一个_temporary目录,导致其中一个完成数据生成和移动到Hive路径之后删除_temporary目录失败(因为还有其他Spark任务在往里面写),进一步导致数据已经到了但是元数据没有创建
(3)上一个任务虽然生成了数据文件但是没有元数据,则后一个任务的overwrite找不到元数据因此无法删除Hive路径下的数据文件
(4)当最后一个执行完成的Spark插入任务结束后,此时Hive路径下已经移动过来多个任务的数据文件,由于已经没有正在执行的Spark写任务,因此删除_temporary目录成功,创建元数据成功,结果就是这个元数据对应了该Hive路径下所有版本的数据文件
解决方案
创建分区表,不同分区多进程同时插入互不影响,如果指定同一个分区,也能插入成功
create table churn_train
(
`_c0` string,
`_c1` string,
...
`_c36` string,
`_c37` string
)
PARTITIONED BY (
`dt` string
)
STORED AS PARQUET;
同时开启两个Spark Shell运行如下代码,往同一个分区插
scala> spark.sql("insert overwrite table test_gp.churn_train partition(dt='20210508') select * from df")
res18: org.apache.spark.sql.DataFrame = []
结果正常,具体原因不明,可能是临时目录不公用了,以后再研究
网友评论