美文网首页
pyspark pandas udf

pyspark pandas udf

作者: AsdilFibrizo | 来源:发表于2019-07-10 14:43 被阅读0次
    • 下面介绍pandas_udf的三种函数使用
    # 引入spark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('learnspark').getOrCreate()
    sc = spark.sparkContext
    

    Scalar UDFs函数

    通常做一些简单计算使用该函数,输出长度与输入长度不变

    # pandans udf 使用说明
    # Scalar UDFs
    # 使用PandasUDFType.SCALAR,当使用select和withColumn是使用该函数
    # 输入pandas.Series输出pandas.Series,输入和输出长度相同
    # 在函数内部,spark通过将列拆分来批量执行pandas udf,将每个批的函数作为数据的子集调用,然后合并
    import pandas as pd
    from pyspark.sql.functions import col, pandas_udf
    from pyspark.sql.functions import PandasUDFType
    from pyspark.sql.types import LongType,FloatType
    from pyspark.sql.types import StructField, StructType
    # 形式1
    def multiply_func(a, b):
        return a * b
    multiply = pandas_udf(multiply_func, returnType=FloatType(),functionType=PandasUDFType.SCALAR) # 这里使用PandasUDFType.SCALAR作为functiontype
    # 形式2
    @pandas_udf(returnType=FloatType(), functionType=PandasUDFType.SCALAR)
    def multiply2(a,b):
      return a+b
    
    # 现造一个数据集
    df = spark.createDataFrame(
        [(1,1.0, 1.0, 1), (1,1.2, 2.0, 1), (1,5.6, 3.0, 2), (1,24.4, 5.0, 2), (1,22.3, 10.0, 2)],
        ('id',"X", "Y","Z"))
    df.show()
    # 运行函数
    df = (df
           .withColumn('M', multiply(df.X,df.Y))
           .withColumn('N', multiply2(df.X,df.Y))
          )
    df.show()
    [out]:
    +---+----+----+---+
    | id|   X|   Y|  Z|
    +---+----+----+---+
    |  1| 1.0| 1.0|  1|
    |  1| 1.2| 2.0|  1|
    |  1| 5.6| 3.0|  2|
    |  1|24.4| 5.0|  2|
    |  1|22.3|10.0|  2|
    +---+----+----+---+
    
    +---+----+----+---+-----+----+
    | id|   X|   Y|  Z|    M|   N|
    +---+----+----+---+-----+----+
    |  1| 1.0| 1.0|  1|  1.0| 2.0|
    |  1| 1.2| 2.0|  1|  2.4| 3.2|
    |  1| 5.6| 3.0|  2| 16.8| 8.6|
    |  1|24.4| 5.0|  2|122.0|29.4|
    |  1|22.3|10.0|  2|223.0|32.3|
    +---+----+----+---+-----+----+
    

    Grouped map UDFs函数

    当需要对每行做统计计算但不改变其长度,比如减均值等操作

    # Grouped map UDFs
    # 这个函数适用于groupBy().apply() 实现“拆分应用组合”模式,它分为三个步骤
    # 1.使用DataFrame.groupBy分割数据
    # 2.对每个group使用函数,输入输出都是pandas.DataFrame,输入数据包含每个groupy的所有行和列
    # 3.合并结果称为一个新的DataFrame
    # 使用groupBy().apply()你必须做以下定义
    # 1.定义每个group的计算的python函数
    # 2.一个StructType对象或者string定义输出DataFrame的schema
    ### 在应用函数之前,组的所有数据都将加载到内存中。这可能导致内存不足异常,特别是不同group是倾斜的
    
    # 这里将每行减去goup的均值
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    
    # 形式1
    schema = StructType([
      StructField("id", LongType(),True),
      StructField("v", FloatType(),True)
    ])
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=v - v.mean())
    df.groupby("id").apply(subtract_mean).show()
    
    # 形式2
    def _subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=v - v.mean())
    subtract_mean2 = pandas_udf(_subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP) # 这里使用PandasUDFType.GROUPED_MAP作为functiontype
    df.groupby("id").apply(subtract_mean2).show()
    

    Grouped aggregate UDFs函数

    一些统计计算,返回与输入长度不同

    # Grouped aggregate UDFs
    # 使用groupBy().agg() 时使用,作为统计函数使用
    from sklearn.metrics import silhouette_score # 计算轮廓系数
    import pandas as pd
    df = spark.createDataFrame(
        [(1,1.0, 1.0, 1), (1,1.2, 2.0, 1), (1,5.6, 3.0, 2), (1,24.4, 5.0, 2), (1,22.3, 10.0, 2)],
        ('id',"X", "Y","Z"))
    @pandas_udf(returnType='float',functionType=PandasUDFType.GROUPED_AGG)
    def sc(call, Y, X):
        merge_df = pd.concat([X, Y], axis=1)
        try:
            return silhouette_score(merge_df.values, call.values)
        except:
            return None
    df.groupBy('id').agg(sc(df.Z,df.X,df.Y)).show()
    [out]:
    +---+-----------+
    | id|sc(Z, X, Y)|
    +---+-----------+
    |  1|  0.4213251|
    +---+-----------+
    

    相关文章

      网友评论

          本文标题:pyspark pandas udf

          本文链接:https://www.haomeiwen.com/subject/aglhkctx.html