dataframe

作者: hehehehe | 来源:发表于2021-09-26 15:16 被阅读0次

    https://github.com/spark-examples/pyspark-examples
    pivot

    df = spark.createDataFrame([('Joe', '70000'), ('Joe', '70000'),( 'Henry', '80000')],
                               ['Name', 'Sallary'])
    +-----+-------+
    | Name|Sallary|
    +-----+-------+
    |  Joe|  70000|
    |  Joe|  70000|
    |Henry|  80000|
    +-----+-------+
    df.selectExpr("row_number() over (order by Name) as a","Name","Sallary").show()
    +---+-----+-------+
    |  a| Name|Sallary|
    +---+-----+-------+
    |  1|Henry|  80000|
    |  2|  Joe|  70000|
    |  3|  Joe|  70000|
    +---+-----+-------+
    df.selectExpr("concat(cast(a as string),'aaa')").show()
    
    from pyspark.sql.functions import first
    df.groupBy().pivot("Name").agg(first("Sallary")).show()
    
        joined_group_df = spark.sql("""
            select id,collect_set(address)[0] as address,collect_set(address_normal)[0] as address_normal,
            concat_ws(",",collect_set(id2)) as id2,
            collect_set(address2)[0] as address2,
            collect_set(address_normal2)[0] as address_normal2,
            concat_ws(",",collect_set(longitude2)) as longitude2,
            concat_ws(",",collect_set(latitude2)) as latitude2
            from joined_df group by id
        """).cache()
    
    dataframe udf
    from pyspark.sql.types import StructType, StringType, StructField, IntegerType,ArrayType
    
    @udf(returnType=ArrayType(StringType()))
    def lonlat_split(address: str):
        return address[:-1].split(",")
    
    @udf(returnType=StringType())
    def get_centroid(lons: list, lats: list):
        points = []
        for lon, lat in zip(lons, lats):
            points.append(Point(float(lon), float(lat)))
        return MultiPoint(points).centroid.wkt
    
    df2 = df.withColumn("lon",lonlat_split(df.district_pos)[0])
    df2 = df2.withColumn("lat",lonlat_split(df.district_pos)[1])
    df2.select(['lon','lat']).show(2)
    +----------+---------+
    |       lon|      lat|
    +----------+---------+
    |109.268983|22.687409|
    |108.810373|23.217889|
    +----------+---------+
    only showing top 2 rows
    from pyspark.sql import functions as fn
    df.groupBy(['_1']).agg(fn.collect_list('_2').alias('list')).show(5)
    +---+------+
    | _1|  list|
    +---+------+
    |  1|[2, 3]|
    |  2|   [3]|
    +---+------+
    
    zipWithIndex
    df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
    
    In [50]: df1 = df0.rdd.zipWithIndex()
    In [51]: df1.take(3)
    Out[51]: [(Row(col1=1), 0), (Row(col1=2), 1), (Row(col1=3), 2)]
    
    In [56]: df1.map(lambda x:Row(a=x[0]['col1'],id=x[1])).toDF().show()
    +---+---+
    |  a| id|
    +---+---+
    |  1|  0|
    |  2|  1|
    |  3|  2|
    |  1|  3|
    |  2|  4|
    |  3|  5|
    +---+---+
    rdd_zip = rdd.zipWithIndex().map(lambda x: (x[1], *x[0]))
    rdd_zip = rdd.zipWithIndex().map(lambda x: {"id": x[1], **x[0].asDict()})
    
    
    join
    In [16]: e1.alias('ee1').join(e2.alias('ee2'),[col('ee1.emp_id') == col('ee2.emp
        ...: _id')],'left').select(col('ee1.emp_id'),col('ee2.emp_id').alias('id2'))
        ...: .show()
    +------+----+
    |emp_id| id2|
    +------+----+
    |     6|null|
    |     5|null|
    |     1|   1|
    |     3|null|
    |     2|   2|
    |     4|null|
    +------+----+
    
    r1 = sc.parallelize([(1,100),(2,200),(3,300),(4,400)])
    r2 = sc.parallelize([(2,(22,0)),(4,(44,1)),(5,(55,0)),(6,(66,1))])
    r3 = sc.parallelize([(2,22,0),(4,44,1),(5,55,0),(6,66,1)])
    
    >>> r1.leftOuterJoin(r2).collect()
    [(1, (100, None)), (2, (200, (22, 0))), (3, (300, None)), (4, (400, (44, 1)))]
    
    >>> df1 = spark.createDataFrame(r1,['id','amount'])
    >>> df1.show()
    +---+------+
    | id|amount|
    +---+------+
    |  1|   100|
    |  2|   200|
    |  3|   300|
    |  4|   400|
    +---+------+
    
    >>> df3 = spark.createDataFrame(r3,['id','weight','exist'])
    >>> df3.show()
    +---+------+-----+
    | id|weight|exist|
    +---+------+-----+
    |  2|    22|    0|
    |  4|    44|    1|
    |  5|    55|    0|
    |  6|    66|    1|
    +---+------+-----+
    
    SELECT a.* FROM product a LEFT JOIN product_details b
    ON a.id=b.id AND b.weight!=44 AND b.exist=0
    WHERE b.id IS NULL;
    
    >>> df4 = df1.join(df3,[df1.id == df3.id , df3.weight!=44 , df3.exist==0],'left')
    >>> df4.filter('weight is null' ).show()
    +---+------+----+------+-----+
    | id|amount|  id|weight|exist|
    +---+------+----+------+-----+
    |  1|   100|null|  null| null|
    |  3|   300|null|  null| null|
    |  4|   400|null|  null| null|
    +---+------+----+------+-----+
    

    agg
    https://www.cnblogs.com/seekerjunyu/p/14016240.html

    1、groupby
    from pyspark.sql.functions import first, collect_list, mean
    In:
    df.groupBy("ID").agg(mean("P"), first("index"),
                         first("xinf"), first("xup"), 
                         first("yinf"), first("ysup"), 
                         collect_list("M"))
    2、dataframe
    df.agg(mean('label')).show()
    +------------------+
    |        avg(label)|
    +------------------+
    |0.7411402157164869|
    +------------------+
    agglist = [mean(x) for x in tips_.columns]
    agglist
    Out[109]: [Column<b'avg(total_bill)'>, Column<b'avg(tip)'>, Column<b'avg(size)'>]
    tips_.agg(*agglist).show()
    +------------------+----------------+-----------------+
    |   avg(total_bill)|        avg(tip)|        avg(size)|
    +------------------+----------------+-----------------+
    |19.785942643392282|2.99827868821191|2.569672131147541|
    +------------------+----------------+-----------------+
    
    df_group = df.groupBy('building') \
        .agg(fn.collect_list('hn_id').alias('hn_id'), fn.collect_list('building_num').alias('building_nums'))
    df_group.foreach(gen_cp)
    
    column

    from pyspark.sql.functions import concat_ws,current_date,when,col

    @udf(returnType=StringType())
    def strQ2B(ustring):
        if ustring:
            ss = []
            for s in ustring:
                rstring = ""
                for uchar in s:
                    inside_code = ord(uchar)
                    if inside_code == 12288:  # 全角空格直接转换
                        inside_code = 32
                    elif (inside_code >= 65281 and inside_code <= 65374):  # 全角字符(除空格)根据关系转化
                        inside_code -= 65248
                    rstring += chr(inside_code)
                ss.append(rstring)
            return "".join(ss)
        return ustring
    df2 = df2.withColumn("address", strQ2B(col("address")))
    
    df2 = df.withColumn("salary",col("salary").cast("Integer"))
    df3 = df.withColumn("salary",col("salary")*100)
    df4 = df.withColumn("CopiedColumn",col("salary")* -1)
    df5 = df.withColumn("Country", lit("USA"))
    df.withColumnRenamed("gender","sex") 
    df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
    df_r=df.withColumn('row_number',sf.row_number().over(Window.partitionBy(df.level).orderBy(df.age)).alias("rowNum"))
    df.withColumn("name", concat_ws(",","firstname",'lastname')) .show()
    df.withColumn("current_date", current_date()) .show()
    df.withColumn("grade", \
       when((df.salary < 4000), lit("A")) \
         .when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
         .otherwise(lit("C")) \
      ).show()
    
    df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")
    
    df.withColumn("salary",df.salary.cast('double')).printSchema()    
    df.withColumn("salary",df.salary.cast(DoublerType())).printSchema()    
    df.withColumn("salary",col("salary").cast('double')).printSchema()
    
    groupby
    df_group = df.groupBy('building') \
        .agg(fn.collect_list('hn_id').alias('hn_id'), fn.collect_list('building_num').alias('building_nums'))
    df_group.foreach(gen_cp)
    
    
    def generate_parent_children(row):
        result = []
        building, hnid_nums = row[0], row[1]
        parent = []
        children = []
        for hnid_num in hnid_nums:
            hnid, num = hnid_num.split('|')
            if num == '-1':
                parent.append(hnid)
            else:
                children.append(hnid)
        if parent and children:
            if len(parent) > 1:
                print("+" * 10)
            child_str = ",".join(children)
            parent_str = ",".join(parent)
            for p in parent:
                result.append(p, None, child_str)
            for c in children:
                result.append(c, parent_str, None)
        return result
    
    rdd = df.rdd.map(lambda x: (x['building'], x['hn_id'] + '|' + x['building_num'] if x['building_num'] else "-1"))
    parent_children_rdd = rdd.groupByKey().flatMap(generate_parent_children)
    
    
    select

    from pyspark.sql.functions import approx_count_distinct,collect_list
    from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
    from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness
    from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
    from pyspark.sql.functions import variance,var_samp, var_pop
    from pyspark.sql.functions import col,expr

    df.select(df.colRegex("`^.*name*`")).show()
    # Using split() function of Column class
    split_col = pyspark.sql.functions.split(df['dob'], '-')
    df3 = df.select("firstname", "middlename", "lastname", "dob", split_col.getItem(0).alias('year'),
                    split_col.getItem(1).alias('month'), split_col.getItem(2).alias('day'))
    df3.show(truncate=False)
    
    from pyspark.sql.functions import col,expr
    data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
    spark.createDataFrame(data).toDF("date","increment") \
        .select(col("date"),col("increment"), \
          expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")) \
        .show()
    
    print("approx_count_distinct: " + \
          str(df.select(approx_count_distinct("salary")).collect()[0][0]))
    
    print("avg: " + str(df.select(avg("salary")).collect()[0][0]))
    
    df.select(collect_list("salary")).show(truncate=False)
    
    df.select(collect_set("salary")).show(truncate=False)
    
    df2 = df.select(countDistinct("department", "salary"))
    df2.show(truncate=False)
    print("Distinct Count of Department &amp; Salary: "+str(df2.collect()[0][0]))
    
    print("count: "+str(df.select(count("salary")).collect()[0]))
    df.select(first("salary")).show(truncate=False)
    df.select(last("salary")).show(truncate=False)
    df.select(kurtosis("salary")).show(truncate=False)
    df.select(max("salary")).show(truncate=False)
    df.select(min("salary")).show(truncate=False)
    df.select(mean("salary")).show(truncate=False)
    df.select(skewness("salary")).show(truncate=False)
    df.select(stddev("salary"), stddev_samp("salary"), \
        stddev_pop("salary")).show(truncate=False)
    df.select(sum("salary")).show(truncate=False)
    df.select(sumDistinct("salary")).show(truncate=False)
    df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
      .show(truncate=False)
    
    
    array_string

    from pyspark.sql.functions import col, concat_ws

    +----------------+------------------+------------+
    |name            |languagesAtSchool |currentState|
    +----------------+------------------+------------+
    |James,,Smith    |[Java, Scala, C++]|CA          |
    |Michael,Rose,   |[Spark, Java, C++]|NJ          |
    |Robert,,Williams|[CSharp, VB]      |NV          |
    +----------------+------------------+------------+
    
    df2 = df.withColumn("languagesAtSchool",
       concat_ws(",",col("languagesAtSchool")))
    df2.printSchema()
    df2.show(truncate=False)
    
    
    df.createOrReplaceTempView("ARRAY_STRING")
    spark.sql("select name, concat_ws(',',languagesAtSchool) as languagesAtSchool," + \
        " currentState from ARRAY_STRING") \
        .show(truncate=False)
    
    ArrayType
    from pyspark.sql.types import StringType, ArrayType,StructType,StructField
    arrayCol = ArrayType(StringType(),False)
    
    data = [
     ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
     ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
     ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
    ]
    
    schema = StructType([ 
        StructField("name",StringType(),True), 
        StructField("languagesAtSchool",ArrayType(StringType()),True), 
        StructField("languagesAtWork",ArrayType(StringType()),True), 
        StructField("currentState", StringType(), True), 
        StructField("previousState", StringType(), True) 
      ])
    
    df = spark.createDataFrame(data=data,schema=schema)
    df.printSchema()
    df.show()
    
    from pyspark.sql.functions import explode
    df.select(df.name,explode(df.languagesAtSchool)).show()
    +----------------+------+
    |            name|   col|
    +----------------+------+
    |    James,,Smith|  Java|
    |    James,,Smith| Scala|
    |    James,,Smith|   C++|
    |   Michael,Rose,| Spark|
    |   Michael,Rose,|  Java|
    |   Michael,Rose,|   C++|
    |Robert,,Williams|CSharp|
    |Robert,,Williams|    VB|
    +----------------+------+
    
    from pyspark.sql.functions import split
    df.select(split(df.name,",").alias("nameAsArray")).show()
    
    from pyspark.sql.functions import array
    df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()
    
    from pyspark.sql.functions import array_contains
    df.select(df.name,array_contains(df.languagesAtSchool,"Java")
        .alias("array_contains")).show()
    
    broadcast
    states = {"NY":"New York", "CA":"California", "FL":"Florida"}
    broadcastStates = spark.sparkContext.broadcast(states)
    
    data = [("James","Smith","USA","CA"),
        ("Michael","Rose","USA","NY"),
        ("Robert","Williams","USA","CA"),
        ("Maria","Jones","USA","FL")
      ]
    
    columns = ["firstname","lastname","country","state"]
    df = spark.createDataFrame(data = data, schema = columns)
    df.printSchema()
    df.show(truncate=False)
    +---------+--------+-------+-----+
    |firstname|lastname|country|state|
    +---------+--------+-------+-----+
    |James    |Smith   |USA    |CA   |
    |Michael  |Rose    |USA    |NY   |
    |Robert   |Williams|USA    |CA   |
    |Maria    |Jones   |USA    |FL   |
    +---------+--------+-------+-----+
    
    def state_convert(code):
        return broadcastStates.value[code]
    
    result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
    result.show(truncate=False)
    +---------+--------+-------+----------+
    |firstname|lastname|country|state     |
    +---------+--------+-------+----------+
    |James    |Smith   |USA    |California|
    |Michael  |Rose    |USA    |New York  |
    |Robert   |Williams|USA    |California|
    |Maria    |Jones   |USA    |Florida   |
    +---------+--------+-------+----------+
    # Broadcast variable on filter
    
    filteDf= df.where((df['state'].isin(broadcastStates.value)))
    
    column
    df.sort(df.fname.asc()).show()
    #between
    df.filter(df.id.between(100,300)).show()
    
    #contains
    df.filter(df.fname.contains("Cruise")).show()
    
    #startswith, endswith()
    df.filter(df.fname.startswith("T")).show()
    df.filter(df.fname.endswith("Cruise")).show()
    #isNull & isNotNull
    df.filter(df.lname.isNull()).show()
    df.filter(df.lname.isNotNull()).show()
    
    #like , rlike
    df.select(df.fname,df.lname,df.id) \
      .filter(df.fname.like("%om")) 
    
    #substr
    df.select(df.fname.substr(1,2).alias("substr")).show()
    
    #isin
    li=["100","200"]
    df.select(df.fname,df.lname,df.id) \
      .filter(df.id.isin(li)) \
      .show()
    
    schema = StructType([
            StructField('name', StructType([
                StructField('fname', StringType(), True),
                StructField('lname', StringType(), True)])
           ),
            StructField('languages', ArrayType(StringType()),True),
            StructField('properties', MapType(StringType(),StringType()),True)
         ])
    +--------------+---------------+-----------------------------+
    |          name|      languages|                   properties|
    +--------------+---------------+-----------------------------+
    | [James, Bond]|     [Java, C#]|[eye -> brown, hair -> black]|
    |  [Ann, Varsa]| [.NET, Python]|[eye -> black, hair -> brown]|
    |[Tom Cruise, ]|[Python, Scala]|   [eye -> grey, hair -> red]|
    |  [Tom Brand,]|   [Perl, Ruby]| [eye -> blue, hair -> black]|
    +--------------+---------------+-----------------------------+
    #getItem()
    df.select(df.languages.getItem(1)).show()
    
    df.select(df.properties.getItem("hair")).show()
    
    #getField from Struct or Map
    df.select(df.properties.getField("hair")).show()
    
    df.select(df.name.getField("fname")).show()
    #dropFields
    #from pyspark.sql.functions import col
    #df.withColumn("name1",col("name").dropFields(["fname"])).show()
    
    #withField
    #from pyspark.sql.functions import lit
    #df.withColumn("name",df.name.withField("fname",lit("AA"))).show()
    
    #from pyspark.sql import Row
    #from pyspark.sql.functions import lit
    #df = spark.createDataFrame([Row(a=Row(b=1, c=2))])
    #df.withColumn('a', df['a'].withField('b', lit(3))).select('a.b').show()
            
    #from pyspark.sql import Row
    #from pyspark.sql.functions import col, lit
    #df = spark.createDataFrame([
    #Row(a=Row(b=1, c=2, d=3, e=Row(f=4, g=5, h=6)))])
    #df.withColumn('a', df['a'].dropFields('b')).show()
    
    from pyspark.sql.functions import col,lit,create_map
    df = df.withColumn("propertiesMap",create_map(
            lit("salary"),col("salary"),
            lit("location"),col("location")
            )).drop("salary","location")
    
    create dataframe
    import pyspark
    from pyspark.sql import SparkSession, Row
    from pyspark.sql.types import StructType,StructField, StringType, IntegerType
    from pyspark.sql.functions import *
    
    columns = ["language","users_count"]
    data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
    
    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    rdd = spark.sparkContext.parallelize(data)
    
    dfFromRDD1 = rdd.toDF()
    dfFromRDD1.printSchema()
    
    dfFromRDD1 = rdd.toDF(columns)
    dfFromRDD1.printSchema()
    
    dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
    dfFromRDD2.printSchema()
    
    dfFromData2 = spark.createDataFrame(data).toDF(*columns)
    dfFromData2.printSchema()     
    
    rowData = map(lambda x: Row(*x), data) 
    dfFromData3 = spark.createDataFrame(rowData,columns)
    dfFromData3.printSchema()
    

    date current_timestamp

                   .getOrCreate()
    data=[["1"]]
    df=spark.createDataFrame(data,["id"])
    
    from pyspark.sql.functions import *
    
    #current_date() & current_timestamp()
    df.withColumn("current_date",current_date()) \
      .withColumn("current_timestamp",current_timestamp()) \
      .show(truncate=False)
    
    #SQL
    spark.sql("select current_date(), current_timestamp()") \
         .show(truncate=False)
    
    # Date & Timestamp into custom format
    df.withColumn("date_format",date_format(current_date(),"MM-dd-yyyy")) \
      .withColumn("to_timestamp",to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS")) \
      .show(truncate=False)
    
    #SQL
    spark.sql("select date_format(current_date(),'MM-dd-yyyy') as date_format ," + \
              "to_timestamp(current_timestamp(),'MM-dd-yyyy HH mm ss SSS') as to_timestamp") \
         .show(truncate=False)
    
    
    from pyspark.sql.functions import *
    
    df=spark.createDataFrame([["1"]],["id"])
    df.select(current_date().alias("current_date"), \
          date_format(current_date(),"yyyy MM dd").alias("yyyy MM dd"), \
          date_format(current_timestamp(),"MM/dd/yyyy hh:mm").alias("MM/dd/yyyy"), \
          date_format(current_timestamp(),"yyyy MMM dd").alias("yyyy MMMM dd"), \
          date_format(current_timestamp(),"yyyy MMMM dd E").alias("yyyy MMMM dd E") \
       ).show()
    
    #SQL
    
    spark.sql("select current_date() as current_date, "+
          "date_format(current_timestamp(),'yyyy MM dd') as yyyy_MM_dd, "+
          "date_format(current_timestamp(),'MM/dd/yyyy hh:mm') as MM_dd_yyyy, "+
          "date_format(current_timestamp(),'yyyy MMM dd') as yyyy_MMMM_dd, "+
          "date_format(current_timestamp(),'yyyy MMMM dd E') as yyyy_MMMM_dd_E").show()
    
    date functions
    data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
    df=spark.createDataFrame(data,["id","input"])
    df.show()
    
    from pyspark.sql.functions import *
    
    #current_date()
    df.select(current_date().alias("current_date")
      ).show(1)
    
    #date_format()
    df.select(col("input"), 
        date_format(col("input"), "MM-dd-yyyy").alias("date_format") 
      ).show()
    
    #to_date()
    df.select(col("input"), 
        to_date(col("input"), "yyy-MM-dd").alias("to_date") 
      ).show()
    
    #datediff()
    df.select(col("input"), 
        datediff(current_date(),col("input")).alias("datediff")  
      ).show()
    
    #months_between()
    df.select(col("input"), 
        months_between(current_date(),col("input")).alias("months_between")  
      ).show()
    #trunc()
    df.select(col("input"), 
        trunc(col("input"),"Month").alias("Month_Trunc"), 
        trunc(col("input"),"Year").alias("Month_Year"), 
        trunc(col("input"),"Month").alias("Month_Trunc")
       ).show()
    
    #add_months() , date_add(), date_sub()
    
    df.select(col("input"), 
        add_months(col("input"),3).alias("add_months"), 
        add_months(col("input"),-3).alias("sub_months"), 
        date_add(col("input"),4).alias("date_add"), 
        date_sub(col("input"),4).alias("date_sub") 
      ).show()
    
    #
    
    df.select(col("input"), 
         year(col("input")).alias("year"), 
         month(col("input")).alias("month"), 
         next_day(col("input"),"Sunday").alias("next_day"), 
         weekofyear(col("input")).alias("weekofyear") 
      ).show()
    
    df.select(col("input"),  
         dayofweek(col("input")).alias("dayofweek"), 
         dayofmonth(col("input")).alias("dayofmonth"), 
         dayofyear(col("input")).alias("dayofyear"), 
      ).show()
    data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
    df2=spark.createDataFrame(data,["id","input"])
    df2.show(truncate=False)
    
    #current_timestamp()
    df2.select(current_timestamp().alias("current_timestamp")
      ).show(1,truncate=False)
    
    #to_timestamp()
    df2.select(col("input"), 
        to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp") 
      ).show(truncate=False)
    
    
    #hour, minute,second
    data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
    df3=spark.createDataFrame(data,["id","input"])
    
    df3.select(col("input"), 
        hour(col("input")).alias("hour"), 
        minute(col("input")).alias("minute"),
        second(col("input")).alias("second") 
      ).show(truncate=False)
    
    

    window func

    simpleData = (("James", "Sales", 3000), \
                  ("Michael", "Sales", 4600), \
                  ("Robert", "Sales", 4100), \
                  ("Maria", "Finance", 3000), \
                  ("James", "Sales", 3000), \
                  ("Scott", "Finance", 3300), \
                  ("Jen", "Finance", 3900), \
                  ("Jeff", "Marketing", 3000), \
                  ("Kumar", "Marketing", 2000), \
                  ("Saif", "Sales", 4100) \
                  )
    
    columns = ["employee_name", "department", "salary"]
    
    df = spark.createDataFrame(data=simpleData, schema=columns)
    
    df.printSchema()
    df.show(truncate=False)
    
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number
    
    windowSpec = Window.partitionBy("department").orderBy("salary")
    
    df.withColumn("row_number", row_number().over(windowSpec)) \
        .show(truncate=False)
    
    from pyspark.sql.functions import rank
    
    df.withColumn("rank", rank().over(windowSpec)) \
        .show()
    
    from pyspark.sql.functions import dense_rank
    
    df.withColumn("dense_rank", dense_rank().over(windowSpec)) \
        .show()
    
    from pyspark.sql.functions import percent_rank
    
    df.withColumn("percent_rank", percent_rank().over(windowSpec)) \
        .show()
    
    from pyspark.sql.functions import ntile
    
    df.withColumn("ntile", ntile(2).over(windowSpec)) \
        .show()
    
    from pyspark.sql.functions import cume_dist
    
    df.withColumn("cume_dist", cume_dist().over(windowSpec)) \
        .show()
    
    from pyspark.sql.functions import lag
    
    df.withColumn("lag", lag("salary", 2).over(windowSpec)) \
        .show()
    
    from pyspark.sql.functions import lead
    
    df.withColumn("lead", lead("salary", 2).over(windowSpec)) \
        .show()
    
    windowSpecAgg = Window.partitionBy("department")
    from pyspark.sql.functions import col, avg, sum, min, max, row_number
    
    df.withColumn("row", row_number().over(windowSpec)) \
        .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
        .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
        .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
        .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
        .where(col("row") == 1).select("department", "avg", "sum", "min", "max") \
        .show()
    
    
    

    相关文章

      网友评论

          本文标题:dataframe

          本文链接:https://www.haomeiwen.com/subject/yfbqnltx.html