美文网首页
PySpark之列操作

PySpark之列操作

作者: HaloZhang | 来源:发表于2020-11-19 15:15 被阅读0次

    简介

    PySpark中的withColumn()函数可以用于修改或者更新值,以及转换DataFrame中已存在的列的类型,添加或者创建一个新的列等等。它是对DataFrame的transformation操作。withColumnRenamed()函数用来重命名一个或多个DataFrame中已存在的列。
    sort()函数可以用来对单个列或者多个列进行排序。
    groupby()函数用来将同一种类型的数据收集到一个组里,然后可以对这个组使用聚合函数

    1. 列基本操作

    老规矩,还是先创建一个DataFrame,以下全部例子都是以这个测试数据为例。

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit
    from pyspark.sql.types import StructType, StructField, StringType,IntegerType
    
    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    
    data = [('James','','Smith','1991-04-01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)
    ]
    
    columns = ["firstname","middlename","lastname","dob","gender","salary"]
    df = spark.createDataFrame(data=data, schema = columns)
    df.printSchema()
    df.show(truncate=False)
    
    输出:

    1.1 改变列的数据类型

    通过对DataFrame使用withColumn()函数,我们可以强制转换或者改变一个列的类型,下面的例子把"salary"列的数据类型由"long"转为"Integer":

    df2 = df.withColumn("salary",col("salary").cast("Integer"))
    df2.printSchema()
    
    输出:

    1.2 更新已存在列中的值

    为了改变值,我们需要传入已存在的列名称作为第一个参数,要修改的值作为第二个参数。下面的例子将"salary"这一列的值全部乘以100:

    df3 = df.withColumn("salary",col("salary")*100)
    df3.show()
    
    输出:

    1.3 根据已存在列创建一个新的列

    要创建一个新的列,我们需要将新列的名称作为第一个参数传入,对已有列叠加一个操作之后作为第二个参数传入,如下例子是对"salary"列的元素乘以-1,然后创建了一个名为"CopiedColumn"的列:

    df4 = df.withColumn("CopiedColumn",col("salary")* -1)
    df4.show()
    
    输出:

    1.4 创建一个新的列

    将要创建的新列的名称作为第一个参数传入,第二个参数这里使用的是lit(),lit函数是用来给DataFrame添加一个包含常数值的列。下面的例子给DataFrame添加了一个新的"Country"列:

    df5 = df.withColumn("Country", lit("USA"))
    df5.show()
    
    输出:

    1.5 重命名一个列

    直接使用withColumnRenamed函数来重命名,第一个参数为原先的列名称,第二个为新的名称。下例将"gender"列改民为"sex":

    df.withColumnRenamed("gender","sex") \
      .show(truncate=False) 
    
    输出:

    1.6 对列进行排序

    我们在这里重新创建一个DataFrame,方便演示:

    simpleData = [("James","Sales","NY",90000,34,10000), \
        ("Michael","Sales","NY",86000,56,20000), \
        ("Robert","Sales","CA",81000,30,23000), \
        ("Maria","Finance","CA",90000,24,23000), \
        ("Raman","Finance","CA",99000,40,24000), \
        ("Scott","Finance","NY",83000,36,19000), \
        ("Jen","Finance","NY",79000,53,15000), \
        ("Jeff","Marketing","CA",80000,25,18000), \
        ("Kumar","Marketing","NY",91000,50,21000) \
      ]
    columns= ["employee_name","department","state","salary","age","bonus"]
    df = spark.createDataFrame(data = simpleData, schema = columns)
    df.printSchema()
    df.show(truncate=False)
    

    下面按照"department"和"state"这两列来进行排序,默认是升序排序,下面这两种写法输出是一样的:

    df.sort("department","state").show(truncate=False)
    # df.sort(col("department"),col("state")).show(truncate=False)
    
    输出:

    如果想降序排序,则可以使用desc()函数,默认是升序,也可以通过asc()函数显式指定。下面的例子对"department"列进行升序,对"state"列进行降序,如下:

    df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
    
    输出:

    2. 列进阶操作

    当我们对DataFrame使用groupby()函数时,它返回一个GroupedData对象,这个对象包含了以下聚合函数:

    • count() - 用来计算每个组中的行数
    • mean() - 用来计算每个组中的平均值
    • max() - 用来计算每个组中的最大值
    • min() - 用来计算每个组中的最小值
    • sum() - 用来计算每个组中的总和
    • avg() - 用来计算每个组中的平均值
    • agg() - agg()函数可以一次执行多个聚合函数

    还是先创建一个DataFrame,它包含了如下几个列,“employee_name”, “department”, “state“, “salary”, “age” 以及 “bonus” columns。

    simpleData = [("James","Sales","NY",90000,34,10000),
        ("Michael","Sales","NY",86000,56,20000),
        ("Robert","Sales","CA",81000,30,23000),
        ("Maria","Finance","CA",90000,24,23000),
        ("Raman","Finance","CA",99000,40,24000),
        ("Scott","Finance","NY",83000,36,19000),
        ("Jen","Finance","NY",79000,53,15000),
        ("Jeff","Marketing","CA",80000,25,18000),
        ("Kumar","Marketing","NY",91000,50,21000)
      ]
    
    schema = ["employee_name","department","state","salary","age","bonus"]
    df = spark.createDataFrame(data=simpleData, schema = schema)
    df.printSchema()
    df.show(truncate=False)
    
    输出:

    2.1 对DataFrame的单列使用groupBy以及聚合函数

    将测试数据"department"列中所有相同的元素聚集在一起,形成若干个组,然后对每个组中的"salary"列求和。

    df.groupBy("department").sum("salary").show(truncate=False)
    
    结果:

    上述操作相当于求每个不同部门中员工的薪资总和。同理也可以求薪资的最小值,最大值、平均值等等。如下:

    df.groupBy("department").min("salary")
    df.groupBy("department").max("salary")
    df.groupBy("department").avg("salary")
    

    2.2 对DataFrame的多列使用groupBy以及聚合函数

    groupBy函数也可以同时对多列进行操作,同理聚合函数也可以对多列进行操作。下面的例子演示了根据"department"和"state"列将原始数据分为若干个组,然后分别求"salary"和"bonus"的总和:

    df.groupBy("department","state").sum("salary","bonus").show()
    
    输出:

    2.3 同时执行多个聚合函数

    我们需要借助agg()函数,在一次groupBy操作中执行多个聚合操作。

    df.groupBy("department") \
        .agg(sum("salary").alias("sum_salary"), \
             avg("salary").alias("avg_salary"), \
             sum("bonus").alias("sum_bonus"), \
             max("bonus").alias("max_bonus") \
         ) \
        .show(truncate=False)
    
    输出:

    这个代码的意思是,对原始数据按照"department"列中的不同元素,分为3个组,然后对每个组的"salary"和"bonus"列分别求总和和平均值,并且分别起了别名。
    当然,还可以叠加filter操作,比如我们想筛选上述结果中,"sum_bonus"大于5000的,那么可以叠加一个"where"操作,代码如下:

    df.groupBy("department") \
        .agg(sum("salary").alias("sum_salary"), \
          avg("salary").alias("avg_salary"), \
          sum("bonus").alias("sum_bonus"), \
          max("bonus").alias("max_bonus")) \
        .where(col("sum_bonus") >= 50000) \
        .show(truncate=False)
    
    输出:

    可以看到,"sum_salary"那一列小于50000的数据被筛选掉了。

    参考

    相关文章

      网友评论

          本文标题:PySpark之列操作

          本文链接:https://www.haomeiwen.com/subject/ainkiktx.html