PySpark之select、collect操作

作者: HaloZhang | 来源:发表于2020-11-18 20:30 被阅读0次

    Select操作

    在PySpark中,select()函数是用来从DataFrame结构中选择一个或多个列,同样可以选择嵌套的列。select()在PySpark中是一个transformation函数,它返回一个包含指定列的新的DataFrame。

    首先,我们先创建一个DataFrame。

    import pyspark
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    data = [("James","Smith","USA","CA"),
        ("Michael","Rose","USA","NY"),
        ("Robert","Williams","USA","CA"),
        ("Maria","Jones","USA","FL")
      ]
    columns = ["firstname","lastname","country","state"]
    df = spark.createDataFrame(data = data, schema = columns)
    df.show(truncate=False)
    
    输出如下:

    选取单列

    我们可以通过给select()函数传入列的名称来筛选指定的列。由于DataFrame是不可变的,因此此函数会返回一个包含选定列的新DataFrame。

    df.select("lastname").show()
    
    结果:

    选取多行

    如果要选取多列的话,只需要把每个列的名称传入select()函数即可。

    df.select("firstname", "lastname").show()
    
    结果:

    除此之外,还可以使用DataFrame对象名称来筛选:

    df.select(df.firstname, df.lastname).show()
    

    也可以使用col函数来指定列:

    from pyspark.sql.functions import col
    df.select(col("firstname"), col("lastname")).show()
    

    结果与上面是一样的。

    选取嵌套的列

    我们先创建一个带有嵌套列结构的DataFrame,如果你不清楚怎么创建,可以先看下这篇文章

    data = [
            (("James",None,"Smith"),"OH","M"),
            (("Anna","Rose",""),"NY","F"),
            (("Julia","","Williams"),"OH","F"),
            (("Maria","Anne","Jones"),"NY","M"),
            (("Jen","Mary","Brown"),"NY","M"),
            (("Mike","Mary","Williams"),"OH","M")
            ]
    
    from pyspark.sql.types import StructType,StructField, StringType        
    schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('state', StringType(), True),
         StructField('gender', StringType(), True)
         ])
    df2 = spark.createDataFrame(data = data, schema = schema)
    df2.printSchema()
    df2.show(truncate=False) # shows all columns
    
    输出:

    在第一列“name”中,又包含了3个子列“firstname”、“middlename”、“lastname”。我们先选择"name"列,看一下输出:

    df2.select("name").show(truncate=False)
    
    结果:

    可以看到,它直接显示出了第一列,那么如果我们想选取嵌套在第一列中的某一列,可以使用以下方式:

    df2.select("name.firstname","name.lastname").show(truncate=False)
    
    输出:

    当然也可以获取嵌套在第一列中的所有列:

    df2.select("name.*").show(truncate=False)
    
    输出:

    Collect操作

    PySpark的collect()操作是用来将所有结点中的数据收集到驱动结点上(PySpark基于分布式架构)。因此collect()操作一般用于小型数据及上,在大型数据及上使用可能会导致内存不足。
    还是使用前一节定义的数据:

    df.show()
    
    结果:

    使用collect()函数来收集数据:

    datacollect = df.collect()
    print(type(datacollect))
    print(datacollect)
    

    结果:


    可以看到collect()返回的是一个列表,列表的元素是Row类型,一个Row对象代表的就是DataFrame中的一行。
    对于Row对象,我们可以使用列名称来访问其中的数据,如下:
    for row in datacollect:
        print(row['firstname'] + ': ' + row['state'])
    
    结果:

    Select() VS Collect()

    1. select()返回一个包含指定列的新的DataFrame,而collect()以列表形式返回整个数据集。
    2. select()是一个transformation操作,而collect()是一个action操作。

    参考

    相关文章

      网友评论

        本文标题:PySpark之select、collect操作

        本文链接:https://www.haomeiwen.com/subject/ywzciktx.html