历经版本迭代更新,spark sql中原本带有模式信息的RDD即SchemaRDD,在spark1.3之后变成了新的数据结构 DataFrame
RDD是风不是的java对象的集合,RDD无法知道RDD内部存储的数据结构的详细模式信息。
DataFrame是以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供详细的结构信息,也就是我们常说的数据库模式信息。Spark SQL可以清除的知道该数据集中包含哪些列,每列的名称和类型。
从RDD转换得到DataFrame
1.利用反射机制,即toDF()方法
正常的创建了一个RDD对象后,对其调用toDF()方法
peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
#接着必须将其注册为临时表才可以被查询
peopleDF.createOrReplaceTempView("people")
personsDF = spark.sql("select * from people")
personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
2.利用编程方式
>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
//生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定义一个模式字符串
>>> schemaString = "name age"
//根据模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
>>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
>>> peopleDF = spark.createDataFrame(rowRDD, schema)
//必须注册为临时表才能供下面查询使用
scala> peopleDF.createOrReplaceTempView("people")
>>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19
网友评论