case class Person(name:String,age:Int)
val rddpeople=sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
rddpeople.registerTempTable("rddTable")
sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
//RDD2演示//导入SparkSQL的数据类型和Row
import org.apache.spark.sql._//创建于数据结构匹配的schema
val schemaString = "name age"
val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))//创建rowRDD
val rowRDD = sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))//用applySchema将schema应用到rowRDD
val rddpeople2 = sqlContext.applySchema(rowRDD, schema)
rddpeople2.registerTempTable("rddTable2")
sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
//parquet演示
val parquetpeople = sqlContext.parquetFile("/sparksql/people.parquet")
parquetpeople.registerTempTable("parquetTable")
sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
//json演示
val jsonpeople = sqlContext.jsonFile("/sparksql/people.json")
jsonpeople.registerTempTable("jsonTable")
sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
//jsonRdd
//千万不要先使用cache SchemaRDD,然后registerAsTable
//在默认的情况下,内存列存储的压缩功能是关闭的,要使用压缩功能需要配置变量COMPRESS_CACHED。
//sqlContext的cache使用
sqlContext.cacheTable("rddTable")
sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
scala2.10.4本身对case class有22列的限制,在使用RDD数据源的时候就会造成不方便;
sqlContext中3个表不能同时join,需要两两join后再join一次;
sqlContext中不能直接使用values插入数据;
在编写sqlContext应用程序的时候,case class要定义在object之外。
sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
网友评论