美文网首页程序员
PySpark DataFrame 入门

PySpark DataFrame 入门

作者: 友仁恒敬 | 来源:发表于2020-03-15 15:39 被阅读0次

    1 创建数据

    from pyspark.sql import *
    from pyspark.sql import functions as F
    Employee = Row("firstName", "lastName", "email", "salary","depid")
    employee1 = Employee('Basher', 'armbrust', 'bash@edureka.co', 100000,1)
    employee2 = Employee('Daniel', 'meng', 'daniel@stanford.edu', 120000,1 )
    employee3 = Employee('Muriel', None, 'muriel@waterloo.edu', 140000 ,2)
    employee4 = Employee('Rachel', 'wendell', 'rach_1@edureka.co', 160000,3)
    employee5 = Employee('Rachel', 'galifianakis', 'rach_2@edureka.co', 160000,4 )
    df_employee = spark.createDataFrame((employee1,employee2,employee3,employee4,employee5))
    # cache 一下,避免反复运行
    df_employee.cache()
    df_employee.count()
    
    >>> df_employee.show()
    +---------+------------+-------------------+------+-----+
    |firstName|    lastName|              email|salary|depid|
    +---------+------------+-------------------+------+-----+
    |   Basher|    armbrust|    bash@edureka.co|100000|    1|
    |   Daniel|        meng|daniel@stanford.edu|120000|    1|
    |   Muriel|        null|muriel@waterloo.edu|140000|    2|
    |   Rachel|     wendell|  rach_1@edureka.co|160000|    3|
    |   Rachel|galifianakis|  rach_2@edureka.co|160000|    4|
    +---------+------------+-------------------+------+-----+
    
    

    2 Distinct 去重

    df_employee.select('firstName').distinct().show()
    +---------+
    |firstName|
    +---------+
    |   Muriel|
    |   Basher|
    |   Rachel|
    |   Daniel|
    +---------+
    >>> df_employee.select('firstName','salary').distinct().show()
    +---------+------+
    |firstName|salary|
    +---------+------+
    |   Rachel|160000|
    |   Muriel|140000|
    |   Daniel|120000|
    |   Basher|100000|
    +---------+------+
    # select count(distinct(firstName)) from employee
    df_employee.select('firstName').distinct().count()
    

    3 聚合Group by

    # select depid,count(1) as count from employee group by depid
    df_employee.groupby('depid').count().show()
    # group by 多个字段
    df_employee.groupby('depid','firstName').count().show()
    +-----+---------+-----+
    |depid|firstName|count|
    +-----+---------+-----+
    |    1|   Basher|    1|
    |    3|   Rachel|    1|
    |    4|   Rachel|    1|
    |    2|   Muriel|    1|
    |    1|   Daniel|    1|
    +-----+---------+-----+
    # group by 聚合sum
    # select depid,firstName,sum(salary) from employee group by depid,firstName;
    >>> df_employee.groupby('depid','firstName').sum('salary').show()
    +-----+---------+-----------+
    |depid|firstName|sum(salary)|
    +-----+---------+-----------+
    |    1|   Basher|     100000|
    |    3|   Rachel|     160000|
    |    4|   Rachel|     160000|
    |    2|   Muriel|     140000|
    |    1|   Daniel|     120000|
    +-----+---------+-----------+
    # 可以看到sum(salary)这个name非常不合理,有没有办法提供alias
    df_employee.groupby('depid','firstName').agg(F.sum(F.col('salary')).alias('total_salary')).show()
    
    # agg 这个接口更通用,多个groupby指标,也可以使用
    
    agg_stat = [
    F.sum(F.col('salary')).alias('total_salary'),
    F.max(F.col('salary')).alias('max_salary'),
    F.count(F.col('salary')).alias('n')
    ]
    
    df_employee.groupby('depid').agg(*agg_stat).show()
    +-----+------------+----------+---+
    |depid|total_salary|max_salary|  n|
    +-----+------------+----------+---+
    |    1|      220000|    120000|  2|
    |    3|      160000|    160000|  1|
    |    2|      140000|    140000|  1|
    |    4|      160000|    160000|  1|
    +-----+------------+----------+---+
    

    4 Filter/ Where 按条件删选

    >>> df_employee.filter(df_employee.firstName=='Rachel').show()
    +---------+------------+-----------------+------+
    |firstName|    lastName|            email|salary|
    +---------+------------+-----------------+------+
    |   Rachel|     wendell|rach_1@edureka.co|160000|
    |   Rachel|galifianakis|rach_2@edureka.co|160000|
    +---------+------------+-----------------+------+
    ## 下面的方式也是可以的
    from pyspark.sql import functions as F
    df_employee.filter(F.col('firstName')=='Rachel').show()
    
    # 多个条件 and: & or: |
    >>> df_employee.filter((F.col('firstName')=='Rachel') | (F.col('firstName')=='Muriel')).show()
    +---------+------------+-------------------+------+
    |firstName|    lastName|              email|salary|
    +---------+------------+-------------------+------+
    |   Muriel|        null|muriel@waterloo.edu|140000|
    |   Rachel|     wendell|  rach_1@edureka.co|160000|
    |   Rachel|galifianakis|  rach_2@edureka.co|160000|
    +---------+------------+-------------------+------+
    # and
    df_employee.filter((F.col('firstName')=='Rachel') & (F.col('lastName')=='wendell')).show()
    
    # filter 太长了,简略一些
    filters = (F.col('firstName')=='Rachel') & (F.col('lastName')=='wendell')
    df_employee.filter(filters).show()
    
    # filters 再长一点怎么办,分行写
    filters = (
    (
     (F.col('firstName')=='Rachel') 
     | 
     (F.col('lastName')=='wendell')
     ) & (F.col('salary') >  140000)
    )
    df_employee.filter(filters).show()
    # 直接传字符串
    
    df_employee.filter('(firstName=="Rachel" or lastName is null) or firstName=="Daniel"').show()
    +---------+------------+-------------------+------+-----+
    |firstName|    lastName|              email|salary|depid|
    +---------+------------+-------------------+------+-----+
    |   Daniel|        meng|daniel@stanford.edu|120000|    1|
    |   Muriel|        null|muriel@waterloo.edu|140000|    2|
    |   Rachel|     wendell|  rach_1@edureka.co|160000|    3|
    |   Rachel|galifianakis|  rach_2@edureka.co|160000|    4|
    +---------+------------+-------------------+------+-----+
    # where 等同于filter
    df_employee.where('salary>140000').show()
    

    5 排序Order By

    df_employee.orderBy('salary',ascending=False)
    
    ## 多个字段
    df_employee.orderBy(['firstName','salary'],ascending=True).show()
    
    ## 多个字段排序方式不同
    df_employee.orderBy([F.col('firstName').desc(),F.col('salary').asc()]).show()
    ## 更简单的方式
    df_employee.orderBy([F.desc('firstName'),F.asc('salary')]).show()
    

    6 Join

    # 创建表
    department1 = Row(id=1, name='HR',bonus=0.2)
    department2 = Row(id=2, name='OPS',bonus=0.3)
    department3 = Row(id=3, name='FN',bonus=0.3)
    department4 = Row(id=4, name='DEV',bonus=0.35)
    department5 = Row(id=5, name='AD',bonus=0.21)
    
    df_dep = spark.createDataFrame((department1,department2,department3,department4,department5))
    df_dep.cache()
    df_dep.count()
    >>> df_dep.show()
    +-----+---+----+
    |bonus| id|name|
    +-----+---+----+
    |  0.2|  1|  HR|
    |  0.3|  2| OPS|
    |  0.3|  3|  FN|
    | 0.35|  4| DEV|
    | 0.21|  5|  AD|
    +-----+---+----+
    # 默认inner join
    >>> df_employee.join(df_dep,df_dep.id==df_employee.depid).show()
    +---------+------------+-------------------+------+-----+-----+---+----+
    |firstName|    lastName|              email|salary|depid|bonus| id|name|
    +---------+------------+-------------------+------+-----+-----+---+----+
    |   Basher|    armbrust|    bash@edureka.co|100000|    1|  0.2|  1|  HR|
    |   Daniel|        meng|daniel@stanford.edu|120000|    1|  0.2|  1|  HR|
    |   Muriel|        null|muriel@waterloo.edu|140000|    2|  0.3|  2| OPS|
    |   Rachel|     wendell|  rach_1@edureka.co|160000|    3|  0.3|  3|  FN|
    |   Rachel|galifianakis|  rach_2@edureka.co|160000|    4| 0.35|  4| DEV|
    +---------+------------+-------------------+------+-----+-----+---+----+
    # outer join
    >>> df_employee.join(df_dep,df_dep.id==df_employee.depid,how='outer').show()
    +---------+------------+-------------------+------+-----+-----+---+----+
    |firstName|    lastName|              email|salary|depid|bonus| id|name|
    +---------+------------+-------------------+------+-----+-----+---+----+
    |     null|        null|               null|  null| null| 0.21|  5|  AD|
    |   Basher|    armbrust|    bash@edureka.co|100000|    1|  0.2|  1|  HR|
    |   Daniel|        meng|daniel@stanford.edu|120000|    1|  0.2|  1|  HR|
    |   Rachel|     wendell|  rach_1@edureka.co|160000|    3|  0.3|  3|  FN|
    |   Muriel|        null|muriel@waterloo.edu|140000|    2|  0.3|  2| OPS|
    |   Rachel|galifianakis|  rach_2@edureka.co|160000|    4| 0.35|  4| DEV|
    +---------+------------+-------------------+------+-----+-----+---+----+
    
    
    # 还可选 left_outer, right_outer, leftsemi,
    # 另,如果join的key相同,可以直接传入column name 
    df1.join(df2, ['id1','id2']).show()
    

    相关文章

      网友评论

        本文标题:PySpark DataFrame 入门

        本文链接:https://www.haomeiwen.com/subject/qqrjehtx.html