美文网首页Spark
PySpark之DataFrame的创建与转换

PySpark之DataFrame的创建与转换

作者: HaloZhang | 来源:发表于2020-11-17 22:10 被阅读0次

    简介

    DataFrame 结构代表的是数据的一个不可变分布式集合,其数据都被组织到有名字的列中,就像关系型数据库中的表一样。DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。
    本文将着重介绍PySpark中DataFrame的各种创建方式,以及与RDD、Pandas之间的转换。


    DataFrame的创建

    1. 从RDD中创建

    为了从存在的RDD结构中创建出DataFrame,我们先定义一些测试数据,如下:

    data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")] #list中的每一个元素都是元祖
    

    接着先创建一个SparkSession,并通过调用SparkContext的parallelize()函数构造出一个RDD对象,代码如下:

    import pyspark
    from pyspark.sql import SparkSession, Row
    from pyspark.sql.types import StructType,StructField, StringType, IntegerType
    
    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    rdd = spark.sparkContext.parallelize(data)
    

    1.1 使用toDF()函数

    RDD的toDF()方法是用来从一个存在的RDD结构中创建一个DataFrame对象,因为RDD是一个分布式的 Java对象的集合,故它没有包含列的信息,因此DataFrame采用的是默认的列。上面列举的测试数据一共有2列,分别用"_1"和“_2”来表示。

    dfFromRDD1 = rdd.toDF()
    dfFromRDD1.printSchema()
    
    printSchema函数用于输出DataFrame的结构,即包含了哪些列,以及每一列的名称和类型等等,输出如下:

    如果想给DataFrame中的每一列指定名称的话,我们可以在toDF函数中传入列的名称,如下:

    columns = ["language","users_count"]
    dfFromRDD1 = rdd.toDF(columns)
    dfFromRDD1.printSchema()
    
    这样输出DataFrame的结构信息的时候,就会包含列名称以及类型了,如下:

    1.2 使用SparkSession中的createDataFrame()函数

    我们可以直接使用createDataFrame函数来在一个原始list数据上创建一个DataFrame,并且叠加上toDF()操作,为每一列指定名称,代码如下:

    dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
    dfFromRDD2.printSchema()
    

    输出与上图是一样的。

    2. 从list对象中创建

    2.1 使用createDataFrame函数并且指定行类型来创建

    先将list中的每个元素都转换成一个PySpark中的row对象,接着使用createDataFrame函数来创建DataFram,代码如下:

    rowData = map(lambda x: Row(*x), data)
    dfFromData3 = spark.createDataFrame(rowData, columns)
    dfFromData3.printSchema()
    dfFromData3.show()
    

    2.2 创建DataFrame时指定格式

    如果在创建DataFrame的时候,同时想指定每一列的名称以及对应的类型,我们可以先创建一个StructType结构,然后再调用createDataFrame传入。

    StructType会在下面的内容中讲述,这里就简单理解它指定了这两列的名称和类型,以及每个字段是否能为空。

    schema = StructType([ \
        StructField("language",StringType(),True), \
        StructField("user_count",StringType(),True),
      ])
    
    df = spark.createDataFrame(data, schema=schema)
    df.printSchema()
    df.show(truncate=False)
    

    3. 从数据源文件中创建

    大部分情况下,我们都是从CSV,文本,JSON,XML等数据源文件中实时创建DataFrame。PySpark默认就支持许多数据格式,因此并不需要再单独导入其他库,我们可以从DataFrameReader类中选择合适的方法来创建DataFrame。

    3.1 从CSV文件中创建DataFrame

    使用csv()方法从CSV文件中读取并创建一个DataFrame对象。(这里采用的是MovieLens数据集中的用户评分文件)。

    df2 = spark.read.csv("/Downloads/ml-latest-small/ratings.csv")
    df2.printSchema()
    df2.show(truncate=False)
    
    输出如下:

    同理,也可以使用text(),json()等方法来读取TXT、Json等文件。

    4. 创建带格式的空的DataFrame

    有的时候我们并不是直接打开文件来进行处理,而是从网络或者其他地方获取到数据流,那此时创建一个空的DataFrame就很有必要。
    一般有两种方式来创建空的DataFrame:

    • 通过空的RDD结构来创建
    schema = StructType([
      StructField('firstname', StringType(), True),
      StructField('middlename', StringType(), True),
      StructField('lastname', StringType(), True)
      ])
    
    df = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
    df.printSchema()
    
    • 种是通过空的list来创建
    df1 = spark.sparkContext.parallelize([]).toDF(schema)
    df1.printSchema()
    
    df2 = spark.createDataFrame([], schema)
    df2.printSchema()
    
    输出均为:

    DataFrame与Pandas、RDD的转换

    RDD转DataFrame

    这个上文已经提及了,使用toDF()函数便可以完成。

    dept = [("Finance",10), 
            ("Marketing",20), 
            ("Sales",30), 
            ("IT",40) 
          ]
    rdd = spark.sparkContext.parallelize(dept)
    deptColumns = ["dept_name","dept_id"]
    df = rdd.toDF(deptColumns)
    df.printSchema()
    df.show(truncate=False)
    

    DataFrame转RDD

    最简单的可以直接使用rdd函数:

    rdd1 = df.rdd
    

    或者使用:

    rdd2 = df.rdd.map(tuple)
    

    DataFrame转Pandas

    PySpark中的DataFrame可以通过toPandas()函数转换成Python的Pandas DataFrame结构。这两者的主要区别是,pandas的操作都是在单个结点上执行的,而PySpark运行在多台机器上,因此在处理大量数据时,PySpark会比Pandas快数倍以上。

    df.show()
    pandas = df.toPandas()
    pandas
    
    结果如下:

    注意,Pandas给数据添加了序号。


    使用StructType和StructField来指定DataFrame的结构

    在上面的例子中,其实我们已经使用过StructType和StructField了,这里再详细介绍一下。PySpark中的StructType和StructField是用来指定DataFrame的结构,并且可以用来创建一些复杂的列项,比如嵌套的结构体、数组等。 StructType是一系列StructField’s的集合,而StructField定义了列的名称,数据类型,以及通过布尔值来指定字段是否可以为空以及元数据等。
    下面用一个例子来演示一下如何使用StructType和StructField来创建一个DataFrame。

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StructField, StringType, IntegerType
    
    spark = SparkSession.builder.master("local[1]") \
                        .appName('SparkByExamples.com') \
                        .getOrCreate()
    
    data = [("James","","Smith","36636","M",3000),
        ("Michael","Rose","","40288","M",4000),
        ("Robert","","Williams","42114","M",4000),
        ("Maria","Anne","Jones","39192","F",4000),
        ("Jen","Mary","Brown","","F",-1)
      ]
    
    schema = StructType([ \
        StructField("firstname",StringType(),True), \
        StructField("middlename",StringType(),True), \
        StructField("lastname",StringType(),True), \
        StructField("id", StringType(), True), \
        StructField("gender", StringType(), True), \
        StructField("salary", IntegerType(), True) \
      ])
     
    df = spark.createDataFrame(data=data,schema=schema)
    df.printSchema()
    df.show(truncate=False)
    
    输出:

    定义嵌套结构

    对于上面的DataFrame,其实我们很容易发现它有一些不合理的地方。比如前三列都是在表示名称,它们同属与一个名叫“name”的列才算是比较合理的。因此,我们可以重新定义一下结构,将"firstname"、“middlename”、“lastname”这三个字段合并为一个"name"字段,代码如下:

    structureData = [
        (("James","","Smith"),"36636","M",3100),
        (("Michael","Rose",""),"40288","M",4300),
        (("Robert","","Williams"),"42114","M",1400),
        (("Maria","Anne","Jones"),"39192","F",5500),
        (("Jen","Mary","Brown"),"","F",-1)
      ]
    structureSchema = StructType([
            StructField('name', StructType([
                 StructField('firstname', StringType(), True),
                 StructField('middlename', StringType(), True),
                 StructField('lastname', StringType(), True)
                 ])),
             StructField('id', StringType(), True),
             StructField('gender', StringType(), True),
             StructField('salary', IntegerType(), True)
             ])
    
    df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
    df2.printSchema()
    df2.show(truncate=False)
    
    输出:

    这里要注意一下,如果修改了StructType的结构,那么原始的list中也需要做相应的修改。

    参考

    相关文章

      网友评论

        本文标题:PySpark之DataFrame的创建与转换

        本文链接:https://www.haomeiwen.com/subject/bnuxiktx.html