PySpark pandas udf

作者: breeze_lsw | 来源:发表于2018-04-13 21:18 被阅读98次

    配置

    所有运行节点安装 pyarrow ,需要 >= 0.8

    为什么会有 pandas UDF

    在过去的几年中,python 正在成为数据分析师的默认语言。一些类似 pandas,numpy,statsmodel,scikit-learn 被大量使用,逐渐成为主流的工具包。同时,spark 也成为了大数据处理的标准,为了让数据分析师能够使用 spark ,Spark在 0.7 版本增加了 python api,也支持了 udf (user-defined functions)。

    这些 udf 对每条记录都会操作一次,同时数据需要在 JVM 和 Python 中传输,因此有了额外的序列化和调用开销。因此可以用 Java 和 Scala 中定义 UDF,然后在 python 中调用它们。

    image

    pandas UDFs 为什么快?

    Pandas Udf 构建在 Apache Arrow 之上,带来了低开销,高性能的UDF。

    每个系统都有自己的存储格式,70%-80%的时间花费在序列化和反序列化上

    Apache Arrow:一个跨平台的在内存中以列式存储的数据层,用来加速大数据分析速度。

    1. 循环执行 转化为 pandas 向量化计算。
    2. python 和 JVM 使用同一种数据结构,避免了序列化的开销

    每批进行向量化计算的数据量由 spark.sql.execution.arrow.maxRecordsPerBatch 参数控制,默认为10000条。如果一次计算的 columns 特别多,可以适当的减小该值。

    一些限制

    1. 不支持所有的 sparkSQL 数据类型,包括 BinaryType,MapType, ArrayType,TimestampType 和嵌套的 StructType。

    2. pandas udf 和 udf 不能混用。

    使用方式

    1. spark df & pandas df

    spark df 与 pandas df 相互转化性能优化,需要开启配置,默认为关闭。

    配置项:

    spark.sql.execution.arrow.enabled true
    

    相互转化

    import numpy as np
    import pandas as pd
    
    //初始化 pandas DF
    pdf = pd.DataFrame(np.random.rand(100000, 3))
    // pdf -> sdf
    %time df = spark.createDataFrame(pdf)
    // sdf -> pdf
    %time result_pdf = df.select("*").toPandas()
    

    性能对比:

    execution.arrow.enabled pdf -> sdf sdf -> pdf
    false 4980ms 722ms
    true 72ms 79ms

    tips: 即便是提高了转化的速度,pandas df 依旧是单机在 driver 中执行的,不应该返回大量的数据。

    2. pandas UDFs(Vectorized UDFs)

    pandas udf 的入参和返回值类型都为 pandas.Series

    注册 udf

    方法1:

    from pyspark.sql.functions import pandas_udf
    
    def plus_one(a):
        return a + 1
    
    //df_udf
    plus_one_pd_udf = pandas_udf(plus_one, returnType=LongType())
    //sql udf
    spark.udf.register('plus_one',plus_one_pd_udf)
    

    方法2:

    from pyspark.sql.functions import pandas_udf
    
    //默认为 PandasUDFType.SCALAR 类型
    @pandas_udf('long')
    def plus_one(a):
        return a + 1
    
    spark.udf.register('plus_one',plus_one)
    

    spark.udf.register可以接受一个 SQL_BATCHED_UDF 或 SQL_SCALAR_PANDAS_UDF 方法。

    使用 pandas udf 后,物理执行计划会从 BatchEvalPython 变为 ArrowEvalPython,可以使用 explain() 检查 pandas udf 是否生效。

    Scalar Pandas UDFs

    import pandas as pd
    
    from pyspark.sql.functions import col, pandas_udf,udf
    from pyspark.sql.types import LongType
    
    def multiply_func(a, b):
        return a * b
        
    multiply_pd = pandas_udf(multiply_func, returnType=LongType())
    
    multiply = udf(multiply_func, returnType=LongType())
    
    x = pd.Series([1, 2, 3] * 10000)
    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    
    %timeit df.select(multiply_pd(col("x"), col("x"))).count()
    %timeit df.select(multiply(col("x"), col("x"))).count()
    

    Grouped Map Pandas UDFs

    计算均方差

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def substract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=v - v.mean())
    
    df.groupby("id").apply(substract_mean).show()
    
    +---+----+
    | id|   v|
    +---+----+
    |  1|-0.5|
    |  1| 0.5|
    |  2|-3.0|
    |  2|-1.0|
    |  2| 4.0|
    +---+----+
    

    测试用例

    数据准备: 10M-row DataFrame , 2列,一列Int类型,一列Double类型

    df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (col('id') / 10000).cast('integer')).withColumn('v', rand())
    df.cache()
    df.count()
    

    Plus one

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    # 输入和输出都是 doubles 类型的 pandas.Series
    @pandas_udf('double', PandasUDFType.SCALAR)
    def pandas_plus_one(v):
        return v + 1
    
    df.withColumn('v2', pandas_plus_one(df.v))
    

    Cumulative Probability

    import pandas as pd
    from scipy import stats
    
    @pandas_udf('double')
    def cdf(v):
        return pd.Series(stats.norm.cdf(v))
    
    df.withColumn('cumulative_probability', cdf(df.v))
    

    Subtract Mean

    # 输入和输出类型都是 pandas.DataFrame
    @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        return pdf.assign(v=pdf.v - pdf.v.mean())
    
    df.groupby('id').apply(subtract_mean)
    

    Scalar 和 Grouped map 的一些区别

    ... Scalar Grouped map
    udf 入参类型 pandas.Series pandas.DataFrame
    udf 返回类型 pandas.Series pandas.DataFrame
    聚合语义 groupby 的子句
    返回大小 与输入一致 rows 和 columns 都可以和入参不同
    返回类型声明 pandas.Series 的 DataType pandas.DataFrame 的 StructType

    性能对比

    类型 udf pandas udf
    plus_one 2.54s 1.28s
    cdf 2min 2s 1.52s
    Subtract Mean 1min 8s 4.4s

    配置和测试方法

    环境

    • Spark 2.3
    • Anaconda 4.4.0 (python 2.7.13)
    • 运行模式 local[10]

    参考

    http://spark.apache.org/docs/latest/sql-programming-guide.html#grouped-map
    https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
    https://www.slideshare.net/PyData/improving-pandas-and-pyspark-performance-and-interoperability-with-apache-arrow

    相关文章

      网友评论

        本文标题:PySpark pandas udf

        本文链接:https://www.haomeiwen.com/subject/vavhkftx.html