1. 进化史
Spark SQL用于结构化数据处理。
Hive:SQL简化了MR操作(on HDFS) 。
Shark:基于Hive开发,更改了MR引擎,提升了SQL on HDFS的性能,即Hive on Spark。
Spark:移除Hive,独立为Spark SQL,封装RDD为DF,DS。
2. 优势:
整合SQL和spark编程,简化RDD编程;
统一方式连接数据源;
兼容Hive HQL;
支持JDBC/ODBC;
3. DataFrame/DataSet < --RDD
DataFrame:以RDD为基础的分布式数据集,带有schema元信息。懒执行,性能优于RDD,底层会进行优化操作。
DataSet:相比DataFrame,提供强类型,方便操作数据。
Spark1.0 => RDD
Spark1.3 => DataFrame
Spark1.6 => DataSet(后续主推)
DataFrame=DataSet[Row]
4. DataFrame构造
SparkSession创建和执行DF和SQL,创建DF方式:
- 读数据源
- 从一个RDD转换
- 从Hive Table查询
读文件中数字类型数据默认为bigint≈Long,内存中获取数字类型数据默认为Int
将DF创建为视图用于查询:
df.createTempView():适用于于当前session且view名不可替换。
df.createOrReplaceTempView():适用于于当前session且可替换。
df.createGlobalTempView():适用于全局session,查询时view前加global_temp.。
5. DataFrame的DSL
使用特定语法DSL,不需要创建view进行查询。
val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
//隐式转换依赖
import spark.implicits._
val df1 = spark.read.json("file/1.txt")
//输出df结果
df1.printSchema()
//调用RDD函数函数
df1.map(e => (e.getString(1),e.getLong(0)+ 1)).show()
df1.select("user","age").show()
//需要计算的话:每个列都需要使用引用,两种方式:
df1.select($"user",$"age"+1).show()
df1.select('user,'age+1).show()
//查询年龄大于10的
df1.filter('age>10).show()
//分组聚合
df1.groupBy("age").count().show()
spark.stop()
6. RDD和DataFrame转换
∵ RDD只关心数据,DF同时关心结构。
∴ RDD+schema=DF
val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
//需要引入转换用包
import spark.implicits._
//rdd转df
val rdd = spark.sparkContext.makeRDD(List(1, 2, 3, 4, 5))
//val df = rdd.toDF()//默认为value
val df = rdd.toDF("id")
df.show()
//df转rdd
val rdd1 = df.rdd
rdd1.foreach(println)
spark.stop()
7. DataSet
强类型数据集,需要提供类型信息(样例类...)。
object Test{
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
import spark.implicits._
val list=List(Person("a",10),Person("b",11))
val ds = list.toDS()
ds.show
ds.printSchema()
spark.stop()
}
}
8. DataFrame和DataSet转换
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
import spark.implicits._
val df = spark.read.json("file/1.txt")
val ds = df.as[Person]
ds.show()
val df1 = ds.toDF()
}
9. RDD和DataSet转换
case class Person(name:String,age:Long)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
import spark.implicits._
val rdd1 = spark.sparkContext.makeRDD(List(Person("a", 10), Person("b", 11)))
val ds = rdd1.toDS
ds.rdd
val rdd2 = spark.sparkContext.makeRDD(List(1,2,3))
rdd2.toDS.show()//默认为value
}
网友评论