美文网首页大数据我爱编程python收藏
Introducing Pandas UDF for PySpa

Introducing Pandas UDF for PySpa

作者: 焉知非鱼 | 来源:发表于2018-04-11 22:01 被阅读1286次

    Introducing Pandas UDF for PySpark

    更新:此博客于 2018 年 2 月 22 日更新,以包含一些更改。

    这篇博文在即将发布的 Apache Spark 2.3 版本中引入了 Pandas UDFs(即 Vectorized UDFs) 特性,这大大提高了 Python 中用户定义函数(UDF)的性能和可用性。

    在过去的几年中,Python 已经成为数据科学家的默认语言。像 pandasnumpystatsmodelscikit-learn 这样的软件包已经获得了广泛的采用并成为主流工具包。同时,Apache Spark 已成为处理大数据的事实标准。为了使数据科学家能够利用大数据的价值,Spark 在 0.7 版中添加了 Python API,并支持user-defined functions。这些用户定义的函数一次只能操作一行,因此会遭遇高序列化和调用开销。因此,许多数据管道在 Java 和 Scala 中定义 UDF,然后从 Python 中调用它们。

    基于 Apache Arrow 构建的 Pandas UDF 为您提供了两全其美的功能 - 完全用 Python 定义低开销,高性能 UDF的能力。

    在 Spark 2.3 中,将会有两种类型的 Pandas UDF: 标量(scalar)和分组映射(grouped map)。接下来,我们使用四个示例程序来说明它们的用法:Plus One,累积概率,减去平均值,普通最小二乘线性回归。

    Scalar Pandas UDFs

    Scalar Pandas UDFs 用于向量化标量运算。要定义一个标量 Pandas UDF,只需使用 @pandas_udf 来注释一个 Python 函数,该函数接受 pandas.Series 作为参数并返回另一个相同大小的 pandas.Series。下面我们用两个例子来说明:Plus One 和 Cumulative Probability。

    Plus One

    计算 v + 1 是演示 row-at-a-time UDFs 和 scalar Pandas UDFs 之间差异的简单示例。请注意,在这种情况下内置的列运算符可能执行得更快。

    使用一次一行的 UDF:

    from pyspark.sql.functions import udf
    
    # 使用 udf 定义一个 row-at-a-time 的 udf
    @udf('double')
    # 输入/输出都是单个 double 类型的值
    def plus_one(v):
        return v + 1
    
    df.withColumn('v2', plus_one(df.v))
    

    使用 Pandas UDFs:

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    # 使用 pandas_udf 定义一个 Pandas UDF
    @pandas_udf('double', PandasUDFType.SCALAR)
    # 输入/输出都是 double 类型的 pandas.Series
    
    def pandas_plus_one(v):
        return v + 1
    
    df.withColumn('v2', pandas_plus_one(df.v))
    

    上面的例子定义了一次一行的 UDF “plus_one” 和一个执行相同的“加一”计算的 scala Pandas UDF “pandas_plus_one”。除了函数装饰器之外,UDF 的定义是相同的:“udf” vs “pandas_udf”。

    在一次一行的版本中,用户定义的函数接收一个 double 类型的参数 “v” 并将 “v + 1” 的结果作为 double 来返回。在 Pandas 版本中,用户定义函数接收 pandas.Series 类型的参数 “v”,并将 “v + 1” 的结果作为pandas.Series 返回。因为 “v + 1” 是在 pandas.Series 上进行矢量化的,所以 Pandas 版本比 row-at-a-time 的版本快得多。

    请注意,使用 scala pandas UDF 时有两个重要要求:

    • 输入和输出序列必须具有相同的大小。
    • 如何将一列分割为多个 pandas.Series 是Spark的内部的事,因此用户定义函数的结果必须独立于分割。

    累积概率

    这个例子展示了 scalar Pandas UDF 更实际的用法:使用 scipy 包计算正态分布 N(0,1) 中值的累积概率

    import pandas as pd
    from scipy import stats
    
    @pandas_udf('double')
    def cdf(v):
        return pd.Series(stats.norm.cdf(v))
    
    
    df.withColumn('cumulative_probability', cdf(df.v))
    

    stats.norm.cdf 在标量值和 pandas.Series 上都是可用的,并且此示例也可以使用一次一行的 UDF 编写。与前面的例子类似,Pandas 版本运行速度更快,如后面的“性能比较”部分所示。

    Grouped Map Pandas UDFs

    Python 用户对数据分析中的 split-apply-combine 模式非常熟悉。Grouped Map Pandas UDF 是针对这种情况设计的,它们针对某些组的所有数据进行操作,例如“针对每个日期应用此操作”。

    Grouped Map Pandas UDF 首先根据 groupby 运算符中指定的条件将 Spark DataFrame 分组,然后将用户定义的函数(pandas.DataFrame -> pandas.DataFrame)应用于每个组,并将结果组合并作为新的 Spark DataFrame 返回。

    Grouped map Pandas UDF 使用与 scalar Pandas UDF 使用相同的函数装饰器 pandas_udf,但它们有一些区别:

    • 用户定义函数的输入:

      • Scalar: pandas.Series
      • Grouped map: pandas.DataFrame
    • 用户定义函数的输出:

      • Scalar: pandas.Series
      • Grouped map: pandas.DataFrame
    • 分组语义:

      • Scalar: 无分组语义
      • Grouped map: 由 “groupby” 从句定义
    • 输出大小:

      • Scalar: 和输入大小一样
      • Grouped map: 任何尺寸
    • 函数装饰器中的返回类型:

      • Scalar: 一个 DataType,用于指定返回的 pandas.Series 的类型
      • Grouped map: 一个 StructType,用于指定返回的 pandas.DataFrame 中每列的名称和类型

    接下来,让我们通过两个示例来说明 grouped map Pandas UDF 的使用场景。

    Subtract Mean

    此示例显示了简单使用 grouped map Pandas UDFs:从组中的每个值中减去平均值。

    @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
    # Input/output are both a pandas.DataFrame
    def subtract_mean(pdf):
        return pdf.assign(v=pdf.v - pdf.v.mean())
    
    df.groupby('id').apply(subtract_mean)
    

    在这个例子中,我们从每个组的 v 值中减去 v 的均值。分组语义由 “groupby” 函数定义,即每个输入到用户定义函数的 pandas.DataFrame 具有相同的 “id” 值。这个用户定义函数的输入和输出模式是相同的,所以我们将“df.schema” 传递给装饰器 pandas_udf 来指定模式。

    Grouped map Pandas UDF 也可以作为驱动程序上的独立 Python 函数调用。这对于调试非常有用,例如:

    sample = df.filter(id == 1).toPandas()
    # Run as a standalone function on a pandas.DataFrame and verify result
    subtract_mean.func(sample)
    
    # Now run with Spark
    df.groupby('id').apply(substract_mean)
    

    在上面的示例中,我们首先将 Spark DataFrame 的一个小子集转换为 pandas.DataFrame,然后将 subtract_mean 作为独立的 Python 函数运行。验证函数逻辑后,我们可以在整个数据集上使用 Spark 调用 UDF。

    普通最小二乘线性回归

    最后一个示例显示了如何使用 statsmodels 为每个组运行 OLS 线性回归。对于每个组,我们根据统计模型 Y = bX + c 计算对于 X = (x1,x2) 的 beta b = (b1,b2)。

    import statsmodels.api as sm
    # df has four columns: id, y, x1, x2
    
    group_column = 'id'
    y_column = 'y'
    x_columns = ['x1', 'x2']
    schema = df.select(group_column, *x_columns).schema
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    # Input/output are both a pandas.DataFrame
    def ols(pdf):
        group_key = pdf[group_column].iloc[0]
        y = pdf[y_column]
        X = pdf[x_columns]
          X = sm.add_constant(X)
        model = sm.OLS(y, X).fit()
    
        return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)
    
    beta = df.groupby(group_column).apply(ols)
    

    此示例演示了 grouped map Pandas UDF 可以与任何任意的 python 函数一起使用:pandas.DataFrame -> pandas.DataFrame。返回的 pandas.DataFrame 可以具有与输入不同的行数和列数。

    性能比较

    最后,我们想要显示 row-at-a-time UDF 和 Pandas UDF 之间的性能比较。我们为以上三个示例运行微基准测试(plus one, cumulative probability 和 subtract mean)。

    配置和方法

    我们在 Databricks 社区版的单节点 Spark 群集上运行了基准测试。

    配置细节:
    数据:带有 Int 列和 Double 列的 10M 行 DataFrame
    集群:6.0 GB 内存,0.88 核心,1 个 DBU
    Databricks 运行时版本:Latest RC(4.0,Scala 2.11)

    有关基准的详细实现,请查看 Pandas UDF Notebook

    img

    如图表所示,Pandas UDF 的表现比 row-at-a-time UDF 好得多,范围从 3倍100倍 不等。

    结论和未来工作

    即将推出的 Spark 2.3 版本为基本改进Python中用户定义函数的功能和性能奠定了基础。今后,我们计划在聚合和窗口函数中引入对 Pandas UDF 的支持。相关工作可以在 SPARK-22216 中进行跟踪。

    Pandas UDFs 是 Spark 社区努力的一个很好的例子。我们要感谢 Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li 以及其他人的贡献。最后,特别感谢 Apache Arrow 社区让这项工作成为可能。

    下一步是什么

    您可以尝试 Pandas UDF notebook ,并且此功能现在作为 Databricks Runtime 4.0 测试版的一部分提供.

    相关文章

      网友评论

        本文标题:Introducing Pandas UDF for PySpa

        本文链接:https://www.haomeiwen.com/subject/dgqtkftx.html