美文网首页Spark优化与实践我爱编程
在spark dataFrame 中使用 pandas data

在spark dataFrame 中使用 pandas data

作者: breeze_lsw | 来源:发表于2018-04-04 11:04 被阅读114次

    背景

    pandas spark
    工作方式 单机,无法处理大量数据 分布式,能处理大量数据
    存储方式 单机缓存 可以调用 persist/cache 分布式缓存
    是否可变
    index索引 自动创建 无索引
    行结构 Pandas.Series Pyspark.sql.Row
    列结构 Pandas.Series Pyspark.sql.Column
    允许列重名

    pandas dataFrame 无法支持大量数据的计算,可以尝试 spark df 来解决这个问题。

    一. xgboost 预测的例子

    优化前

    import xgboost as xgb
    import pandas as pd
    import numpy as np
    
    # 加载模型
    bst = xgb.Booster()
    bst.load_model("xxx.model")
    
    # 变量列表
    var_list=[...]
    df.rdd.map(lambda x : cal_xgb_score(x,var_list,ntree_limit=304)).toDF()
    
    # 计算分数
    def cal_xgb_score(x,var_list,ntree_limit=50):
        feature_count = len(var_list)
        x1 = pd.DataFrame(np.array(x).reshape(1,feature_count),columns=var_list)
        # 数据变化操作
        y1 = transformFun(x1)
        
        test_x = xgb.DMatrix(y1.drop(['mobile','mobile_md5'],xais=1),missing=float('nan'))
        y1['score'] = bst.predict(test_x,ntree_limit=ntree_limit)
        y2 = y1[['mobile','mobile_md5','score']]
        return {'mobile': str(y2['mobile'][0]),'mobille_md5':str(y2['mobile_md5'][0]),'score':float(y2['score'][0])}
    

    每条数据都转化为 pd,增加了额外开销。

    优化后:

    
    df.rdd.mapPartitions(lambda x : cal_xgb_score(x,var_list,ntree_limit=304)).toDF()
    
    def cal_xgb_score(x,var_list,ntree_limit=50):
        feature_count = len(var_list)
        //将 iterator 转为list 
        x1 = pd.DataFrame(list(x),columns=var_list)
        ...
        //将 pdf 转为字典
        return y1[['mobile','mobile_md5','score']].to_dict(orient='record')
    

    二. toPandas 的例子

    优化前:

    df.toPandas()
    

    优化后:

    import pandas as pd
    def _map_to_pandas(rdds):
        return [pd.DataFrame(list(rdds))]
        
    def toPandas(df, n_partitions=None):
        if n_partitions is not None: df = df.repartition(n_partitions)
        df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
        df_pand = pd.concat(df_pand)
        df_pand.columns = df.columns
        return df_pand
    
    # 98列,22W行,类型 array/string/Long/Int,分区数 200
    df = spark.sql("...").sample(False,0.002)
    
    df.cache()
    df.count()
    
    # 原生的 toPandas 方法
    %timeit df.toPandas()
    
    # 分布式的 toPandas
    %timeit toPandas(df)
    
    #使用 apache arrow,spark 版本2.3以上
    spark.sql("set spark.sql.execution.arrow.enabled=true")
    %timeit df.toPandas()
    

    总结

    一. xgboost 预测

    单条数据处理速度从 120 record / min 提高到 3278 record / min

    tips: 如果一个分区数据量过大将会导致 executor oom

    二. spark dataframe 转 pandas dataframe

    type cost (seconds)
    native toPandas 12
    distributed toPandas 5.91
    arrow toPandas 2.52

    toPandas 返回的数据归根结底还是缓存在 driver 的内存中的,不建议返回过大的数据。

    相关文章

      网友评论

        本文标题:在spark dataFrame 中使用 pandas data

        本文链接:https://www.haomeiwen.com/subject/oahkhftx.html