本篇是接着上篇pyspark:dataframe与rdd的一点小事
大纲
- rdd.join
- 上篇未解决的问题
1. rdd.join
可能我对rdd.join有偏见,所以当两个rdd做合并操作的时候,我首选reduceByKey,但是今天的例子打脸了。。。
首先说明下今天的数据集,一共有3个,数据量分别为:197435 ,80391844 ,1258012
当我用reduceByKey时,可能因为两个rdd的数据量差异较大,倾斜比较明显,程序总是在最后的几个task卡住。反而用leftOuterJoin通过了,并且时间并没有想象中的慢。当然将数据转成dataframe进行操作仍然是要快一点。具体耗时如下:
-- | rdd | dataframe |
---|---|---|
join 1 | 124.620 | 174.766 |
join 2 | 462.057 | 216.934005022 |
本以为rdd转dataframe会是最优的选择,没想到当我将第一个的数据扩大到150w左右的时候,程序出现了卡顿,但没想到leftOuterJoin通过了,并且时间也不算慢。下面是程序运行的时间表,说明下从step 2开始的数据量表示join的数据量:
-- | 数据量 | time |
---|---|---|
step 1 | 1709574 | 1.919 |
step 2 | 80391844 | 297.195 |
step 3 | 1928263 | 564.77 |
PS: 后来发现先进行数据小的rdd合并再合并数据大的rdd耗时更少。
对比之前的经验,我想了下这次dataframe失败的原因:
- dataframe更适合结构化的数据,但这次的数据是json string的居多;
- 大部分都是rdd2dataframe,可能当rdd数据比重较大时,不适合转成dataframe操作???(这点原因没有证实,所以先保留。因为我没有验证当都是结构化数据时,大量的rdd2dataframe是否仍然不合适)
2. 上篇未解决的问题
上篇最后有提到会出现“AttributeError: 'PipelinedRDD' object has no attribute 'toDF'”的问题。只需要在初始化sc后,加上下面这句代码即可。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
上篇也提到过,先进行dataframe的操作再进行rdd2dataframe的操作就不会报错,这次也是如此。过程是这样的:
- 当只进行rdd2dataframe操作的时候,需要添加上面的代码,不然会出现“AttributeError: 'PipelinedRDD' object has no attribute 'toDF'”的问题
- 既有dataframe也有rdd2dataframe操作的时候,上述代码会导致“pyspark.sql.utils.AnalysisException: u"Table or view not found:”的问题,但是删掉上述代码,将操作顺序改成先dataframe再rdd,则程序正常。
看了源码后发现,HiveContext是SQLContext的子类,因此先从hive读取数据后即使没有上面的代码也不会出现“AttributeError: 'PipelinedRDD' object has no attribute 'toDF'”的问题。
但是还是没明白为什么hive2dataframe->rdd2dataframe-->hive2dataframe会出现“pyspark.sql.utils.AnalysisException: u"Table or view not found:”的问题。但是有人对此问题给出解决方案,先码起来。
网友评论