美文网首页报错集锦大数据学习
Spark SQL overwrite写入Hive存在重复数据,

Spark SQL overwrite写入Hive存在重复数据,

作者: xiaogp | 来源:发表于2021-03-22 11:43 被阅读0次

摘要:Spark SQLHive

问题描述

使用Spark SQL将DataFrame调用其API overwrite写入Hive,如果存在多个任务同时往一张hive表overwrite,那么会导致只有其中一个任务成功,另外其他的任务都失败的问题,并且写入的结果存在同一张表有重复的行


问题复现

测试数据采用一个79万多的二维表,从hdfs读取写入hive

scala> val df = spark.read.format("csv").load("hdfs:///tmp/churn_train.csv")
df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 36 more fields]

scala> df.write.format("parquet").mode("overwrite").saveAsTable("test_gp.churn_train")
hive> select count(*) from churn_train;
Total MapReduce CPU Time Spent: 11 seconds 370 msec
OK
790609

此时开启两个Spark Shell Master进程,将上面的代码同时执行,先启动的Spark Shell写入失败报错,另一个执行成功,在查看Hive发现并没有起到覆盖的效果,这个表的数据量增长到158万正好是原来的2倍

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hive/warehouse/test_gp.db/churn_train/_temporary/0/_temporary/attempt_20210507105116_0003_m_000000_0/part-00000-3dcbc713-0662-413b-b9a1-670c00e38c77-c000.snappy.parquet (inode 16321371): File does not exist. Holder DFSClient_attempt_20210507105116_0003_m_000000_0_-1967623668_65 does not have any open files.
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or view 'churn_train' already exists in database 'test_gp';
hive> select count(*) from churn_train;
Total MapReduce CPU Time Spent: 15 seconds 110 msec
OK
1581218

此时如果单独执行一次其中任何一个进程,覆盖重新生效数据量回到79万
如果不使用DataFrame API,采用Spark SQL语句也是同样的报错

scala> df.createOrReplaceTempView("df")
scala> spark.sql("insert overwrite table test_gp.churn_train select * from df")

从hdfs上可以看到也存在重复的的数据文件,有两组文件,每组大小是一样的

[root@cloudera01 ~]# hdfs dfs -ls /user/hive/warehouse/test_gp.db/churn_train
Found 5 items
-rw-r--r--   3 hdfs hive          0 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/_SUCCESS
-rw-r--r--   3 hdfs hive    7396932 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00000-1f4875ad-fe90-4a03-9198-3318cb77f1ce-c000.snappy.parquet
-rw-r--r--   3 hdfs hive    7396932 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00000-e4382cd2-7bd6-4343-9611-70f451403454-c000.snappy.parquet
-rw-r--r--   3 hdfs hive    6912605 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00001-1f4875ad-fe90-4a03-9198-3318cb77f1ce-c000.snappy.parquet
-rw-r--r--   3 hdfs hive    6912605 2021-05-07 11:38 /user/hive/warehouse/test_gp.db/churn_train/part-00001-e4382cd2-7bd6-4343-9611-70f451403454-c000.snappy.parquet

原因分析

参考这篇文章https://blog.csdn.net/u013332124/article/details/109359773
Spark SQL在执行SQL的overwrite的时候并没有删除旧的的数据文件(Spark SQL生成的数据文件),Spark SQL写入Hive的流程如下


具体几点如下
(1)Spark写入Hive会先生成一个临时的_temporary目录用于存储生成的数据文件,全部生成完毕后全部移动到输出目录,然后删除_temporary目录,最后创建Hive元数据
(2)多个Spark写入数据任务使用了同一个_temporary目录,导致其中一个完成数据生成和移动到Hive路径之后删除_temporary目录失败(因为还有其他Spark任务在往里面写),进一步导致数据已经到了但是元数据没有创建
(3)上一个任务虽然生成了数据文件但是没有元数据,则后一个任务的overwrite找不到元数据因此无法删除Hive路径下的数据文件
(4)当最后一个执行完成的Spark插入任务结束后,此时Hive路径下已经移动过来多个任务的数据文件,由于已经没有正在执行的Spark写任务,因此删除_temporary目录成功,创建元数据成功,结果就是这个元数据对应了该Hive路径下所有版本的数据文件

解决方案

创建分区表,不同分区多进程同时插入互不影响,如果指定同一个分区,也能插入成功

create table churn_train 
(
`_c0` string,
`_c1` string,
...
`_c36` string,
`_c37` string
)
PARTITIONED BY (
`dt` string
)
STORED AS PARQUET;

同时开启两个Spark Shell运行如下代码,往同一个分区插

scala> spark.sql("insert overwrite table test_gp.churn_train partition(dt='20210508') select * from df")
res18: org.apache.spark.sql.DataFrame = [] 

结果正常,具体原因不明,可能是临时目录不公用了,以后再研究

相关文章

网友评论

    本文标题:Spark SQL overwrite写入Hive存在重复数据,

    本文链接:https://www.haomeiwen.com/subject/skdzcltx.html