DataFrame与RDD的互操作

作者: sparkle123 | 来源:发表于2018-03-05 19:43 被阅读0次

    DataFrame Interoperating with RDDs

    参考官网
    http://spark.apache.org/docs/2.2.0/sql-programming-guide.html#interoperating-with-rdds

    DataFrameRDD互操作的两种方式比较:
    1)反射推导式:case class 前提:事先需要知道字段、字段类型
    2)编程式:Row 如果第一种情况不能满足要求(事先不知道列等schema信息)

    1. 选型:优先考虑第一种,使用简单

    下面的代码演示了

    • Inferring the Schema Using Reflection
    • Programmatically Specifying the Schema
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType,IntegerType}
    
    object DataFrameRDDApp {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate();
        // Create an RDD of Person objects from a text file
        val testRDD = spark.sparkContext.textFile("C:\\Users\\Administrator\\IdeaProjects\\SparkSQLProject\\spark-warehouse\\test.txt")
        //inferReflection(spark,testRDD)
    
        program(spark,testRDD)
    
        spark.stop();
    
      }
    
      def inferReflection(spark: SparkSession,testRDD: RDD[String]): Unit = {
    
        // RDD ==> DataFrame
    
        // For implicit conversions from RDDs to DataFrames
        import spark.implicits._
        val infoDF = testRDD.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF();
    
        infoDF.show();
    
        infoDF.filter(infoDF.col("age") > 30).show
    
        // Register the DataFrame as a temporary view
        infoDF.createOrReplaceTempView("infos")
    
        // SQL statements can be run by using the sql methods provided by Spark
        spark.sql("select * from infos where age > 30").show();
      }
    
      def program(spark:SparkSession,testRDD: RDD[String]): Unit = {
    
        // The schema is encoded in a string
        val schemaString = "id name age"
        // Generate the schema based on the string of schema
        val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
    
        val schema = StructType(fields)
    
    
        val structType = StructType(Array(StructField("id",IntegerType,true),
          StructField("name",StringType,true),
          StructField("age",IntegerType,true)))
    
        // Convert records of the RDD (people) to Rows
        val rowRDD = testRDD.map(_.split(","))
          .map(attributes => Row(attributes(0),attributes(1).trim,attributes(2)))
    
        val infoDF = spark.createDataFrame(rowRDD,schema)
    
        infoDF.printSchema()
        infoDF.show()
    
        infoDF.filter(infoDF.col("age") > 30).show
        infoDF.createOrReplaceTempView("infos")
        spark.sql("select * from infos where age > 30").show()
    
      }
    
      case class Info(id: Int, name: String, age: Int)
    
    }
    
    

    查看源码,发现里面的注释写的挺好。


    SparkSession源码

    相关文章

      网友评论

        本文标题:DataFrame与RDD的互操作

        本文链接:https://www.haomeiwen.com/subject/yxshfftx.html