美文网首页
如何基于RDD方式完成DataFrame的代码构建?

如何基于RDD方式完成DataFrame的代码构建?

作者: 鹤子青云上 | 来源:发表于2023-09-21 15:42 被阅读0次

    DataFrame对象可以从RDD转换而来,都是分布式数据集 其实就是转换一下内部存储的结构,转换为二维表结构。

    将RDD转换为DataFrame方式1:

    调用spark

    # 首先构建一个RDD rdd[(name, age), ()]
    rdd = sc.textFile("../data/sql/people.txt").\
      map(lambda x: x.split(',')).\
      map(lambda x: [x[0], int(x[1])])               # 需要做类型转换, 因为类型从RDD中探测
    # 构建DF方式1
    df = spark.createDataFrame(rdd, schema = ['name', 'age'])
    

    通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

    # coding:utf8
    # 演示DataFrame创建的三种方式
    from pyspark.sql import SparkSession
    if __name__ == '__main__':
        spark = SparkSession.builder.\
           appName("create df").\
    master("local[*]").\
    getOrCreate()
    
    sc = spark.sparkContext
    # 首先构建一个RDD rdd[(name, age), ()]
    rdd = sc.textFile("../data/sql/people.txt").\
    map(lambda x: x.split(',')).\
    map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
    # 构建DF方式1
    df = spark.createDataFrame(rdd, schema = ['name', 'age'])
    # 打印表结构
    df.printSchema()
    # 打印20行数据
    df.show()
    df.createTempView("ttt")
    spark.sql("select * from ttt where age< 30").show()
    

    将RDD转换为DataFrame方式2:
    通过StructType对象来定义DataFrame的“表结构”转换RDD

    # 创建DF , 首先创建RDD 将RDD转DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
      map(lambda x:x.split(',')).\
      map(lambda x:(int(x[0]), x[1], int(x[2])))
    
    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
      add("id", IntegerType(), nullable=False).\
      add("name", StringType(), nullable=True).\
      add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    df = spark.createDataFrame(rdd, schema)
    # coding:utf8
    # 需求: 基于StructType的方式构建DataFrame 同样是RDD转DF
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    if __name__ == '__main__':
      spark = SparkSession.builder.\
        appName("create_df"). \
        config("spark.sql.shuffle.partitions", "4"). \
        getOrCreate()
      # SparkSession对象也可以获取 SparkContext
      sc = spark.sparkContext
      # 创建DF , 首先创建RDD 将RDD转DF
      rdd = sc.textFile("../data/sql/stu_score.txt").\
        map(lambda x:x.split(',')).\
        map(lambda x:(int(x[0]), x[1], int(x[2])))
      # StructType 类
      # 这个类 可以定义整个DataFrame中的Schema
      schema = StructType().\
        add("id", IntegerType(), nullable=False).\
        add("name", StringType(), nullable=True).\
        add("score", IntegerType(), nullable=False)
      # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
      # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
      df = spark.createDataFrame(rdd, schema)
      df.printSchema()
      df.show()
    

    将RDD转换为DataFrame方式3:

    使用RDD的toDF方法转换RDD

    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
      add("id", IntegerType(), nullable=False).\
      add("name", StringType(), nullable=True).\
      add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    
    # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    
    # 方式2: 传入完整的Schema描述对象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()
    # coding:utf8
    # 需求: 使用toDF方法将RDD转换为DF
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    if __name__ == '__main__':
        spark = SparkSession.builder.\
          appName("create_df"). \
          config("spark.sql.shuffle.partitions", "4"). \
          getOrCreate()
        # SparkSession对象也可以获取 SparkContext
        sc = spark.sparkContext
        # 创建DF , 首先创建RDD 将RDD转DF
        rdd = sc.textFile("../data/sql/stu_score.txt").\
          map(lambda x:x.split(',')).\
          map(lambda x:(int(x[0]), x[1], int(x[2])))
        # StructType 类
        # 这个类 可以定义整个DataFrame中的Schema
        schema = StructType().\
           add("id", IntegerType(), nullable=False).\
           add("name", StringType(), nullable=True).\
           add("score", IntegerType(), nullable=False)
        # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
        # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
        # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
        df = rdd.toDF(['id', 'subject', 'score'])
        df.printSchema()
        df.show()
        # 方式2: 传入完整的Schema描述对象StructType
        df = rdd.toDF(schema)
        df.printSchema()
        df.show()
    

    相关文章

      网友评论

          本文标题:如何基于RDD方式完成DataFrame的代码构建?

          本文链接:https://www.haomeiwen.com/subject/fujzvdtx.html