美文网首页
pyspark:dataframe与rdd的一点小事

pyspark:dataframe与rdd的一点小事

作者: 张虾米试错 | 来源:发表于2019-01-05 16:42 被阅读0次

    大纲

    1. 问题描述
    2. 解决方案
    3. 代码以及效果
    4. 总结

    1.问题描述

    要做的事情

    从一堆房源hive表和hdfs数据中读取信息,并将同一id的信息整合到一起。共有5个hive表,2个hdfs文件;每个表所需操作的id数是千万数量级,每个表中字段20~200不等。

    当前做法

    用pyspark读取hive表以及hdfs的数据,并转换成rdd,然后用leftOuterJoin将信息整合;这样做需消耗至少30min,甚至1h的时间,速度太慢。本文就是针对该问题进行的优化。

    2.解决方案

    错误尝试

    对于程序的性能问题,其实最开始的做法应该是打印每个步骤所需的时间,然后从最耗费时间的步骤开始。

    但是我最开始并没有这样做,因为leftOuterJoin的操作是O(n^2), n为表的大小,因此我以为leftOuterJoin是最耗时的。于是我直接将leftOuterJoin换成了rdd.union(), 然后再reduceByKey的方式。但是当我将所有的leftOuterJoin操作改过来后,发现总的时间还是需要30min。

    另外提醒下,当数据量过大时,用leftOuterJoin容易出现内存溢出的问题;当两个rdd数量级差别较大时,用reduceByKey容易出现数据倾斜的问题。如果是两个rdd取交集的话,建议先用filter过滤,然后再用reduceByKey进行合并操作。

    找对方向

    hive数据直接用dataframe操作

    说实话,看到这个效果有点沮丧。不过,我在等待程序运行的过程中,发现将两个rdd用reduceByKey操作的时间并不长,大量的时间其实花在读取hive表并转成rdd的阶段。这时我才意识到,我应该打印出每步操作的时间,针对性地优化。很快,验证了我的想法,确实在转换rdd耗费了不少时间。

    转换成rdd是为了做两张表合并的操作,然后我查资料发现,其实针对结构化的数据,dataframe和dataset比rdd处理更快;而数据从hive表中读出后本身就是dataframe的格式,其实完全没有必要转成rdd。实验后,果然省下不少时间。另外,用dataframe的join操作也比rdd的leftOuterJoin或者reduceByKey快很多。

    其实dataframe就像SQL操作一样,自身带有join,left join, right join等操作,速度比rdd的left join快很多。但是具体原理还没有细究,在第3部分会介绍下dataframe的某些用法。

    hdfs数据转换成dataframe

    由于还有两份数据存在hdfs,因此有个问题:是将rdd转成dataframe操作更快呢?还是将dataframe转换成rdd更快呢?实验后发现,前者的时间更快,而且rdd转换成dataframe比反过来操作更快。(dataframe转成rdd我是用json格式存储的,怀疑与这个也有关系。)

    存储dataframe

    目前看到的dataframe的存储方式有3种,但没有进行比较:

    • 直接存成csv或者json格式
    • 转成rdd存储
    • 存到hive表

    3.代码说明

    from pyspark import SparkConf, SparkContext
    from pyspark.sql import HiveContext, SQLContext
    from pyspark import StorageLevel
    from pyspark.sql import Row
    
    conf = (SparkConf().setAppName("spark_job_name")
                .set("spark.hadoop.validateOutputSpecs", "false")
                .set("spark.akka.frameSize", "300")
                .set("spark.driver.maxResultSize", "6g")
                )
    

    3.1rdd与dataframe的相互转换

    ### rdd2dataframe
    ### 方法1
    rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Bob', age=5, height=80),Row(name='Cycy', age=10, height=80),Row(name='Cycy', age=10, height=80)])
    df = rdd.toDF()
    df.show()
    
    ### 方法2
    l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(l)
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    schemaPeople = sqlContext.createDataFrame(people)
    ### 推荐使用第一种
    
    ### 如果数据是dict格式,可以使用下面的方式
    info = {"a": 1, "b": 2, "c": 3}
    new_row = Row(**info)
    #Row(a=1, b=2, c=3)
    new_row.asDict()
    #{'a': 1, 'c': 3, 'b': 2}
    
    
    ### dataframe2rdd
    _rdd = df.rdd.map(lambda x: x.asDict())
    print _rdd.take(3)
    #[{'age': 5, 'name': u'Alice', 'height': 80}, {'age': 5, 'name': u'Bob', 'height': 80}, {'age': 10, 'name': u'Cycy', 'height': 80}]
    

    3.2 dataframe之join的用法

    (接上段代码)

    df1 = sc.parallelize([Row(name='Alice', score=78),Row(name='Bob', score=80),Row(name='Cycy', score=80)]).toDF()
    
    ### join的用法
    #方法1
    df_join = df.join(df1, df.name==df1.name)
    
    #方法2
    df_join = df.join(df1, ["name"])
    #DataFrame[name: string, age: bigint, height: bigint, score: bigint]
    #发现第二种方法只会得到4个字段,但是第一种会得到5个字段
    

    3.3 groupBy的用法

    df_g = df.groupBy("name").sum("height")
    df_g.show()
    '''
    +-----+-----------+                                                             
    | name|sum(height)|
    +-----+-----------+
    | Cycy|        160|
    |  Bob|         80|
    |Alice|         80|
    +-----+-----------+
    
    '''
    import pyspark.sql.functions as sf
    df_g1=rdd.groupBy("name").agg(sf.sum("height").alias('height_sum'))
    df_g1.show()
    '''
    +-----+----------+                                                              
    | name|height_sum|
    +-----+----------+
    | Cycy|       160|
    |  Bob|        80|
    |Alice|        80|
    +-----+----------+
    
    '''
    
    

    3.4 dataframe保存到hdfs

    本文尝试了3种存储的方式,直接存到hive表还没尝试。

    ###### 转换成rdd,以json格式保存
    import decimal
    def to_json(row):
        for k, v in row.items():
            if isinstance(v, decimal.Decimal):
                row[k] = float(v)
        return json.dumps(row, ensure_ascii=False).encode("utf-8")
    
    rdd = df.rdd.map(lambda x: x.asDict()).map(to_json)
    rdd.saveAsTextFile(output_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
    
    ###### 直接存成csv格式
    df.write.csv(csv_output_path, header='true')
    
    ###### 直接存成parquet
    df.write.parquet(output_path)
    

    由于json不支持decimal类型,因此如果有decimal类型的数据,存成json之前要转换成float类型。

    对比以上三种方式,数据量14372302 * 90,耗时情况如下:

    -- rdd+json csv parquet
    save 145.837 126.671 147.9598
    write 6.07 18.1 ---

    parquet方式的读取暂时有bug,还没解决。其他方式的读取可以参见pyspark系列--pyspark读写dataframe

    目前采用dataframe转rdd,以json格式存储,完整的流程耗时:当hive表的数据量为100w+时,用时328.78s; 当数据量为1000w+时,用时408.02s。

    当然,spark的运行速度还与spark的资源以及spark-submit的配置有关系。

    3.5 未解决的问题

    到这里,貌似所有问题都解决了,但是将所有流程串起来后,发现如果hive和hdfs两种来源的数据交叉操作的话,很容易报错,错误有如下几种:

    ###
    AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
    
    ###
    pyspark.sql.utils.AnalysisException: u"Table or view not found:
    
    ###
    pyspark.sql.utils.AnalysisException: u'org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.transport.TTransportException;'
    

    现在还没弄清是什么原因,但是先hive后hdfs就没问题。

    4.总结

    • dataframe更适合结构化的数据,spark还是有很多方法值得探索。
    • 找问题要有方法,不能臆测。其实上述讲到的错误尝试和正确方法花费的时间差不多,但是如果我从开始就打印出每个步骤的运行时间,我就可以省去错误尝试的时间。
    • 快速迭代离不开高效的数据处理。
    • 确定目标可行后,一定要寻找解决方案,你会发现方案比你想象的要多。

    参考资料

    相关文章

      网友评论

          本文标题:pyspark:dataframe与rdd的一点小事

          本文链接:https://www.haomeiwen.com/subject/udmhrqtx.html