1.在新版本中,SparkSession是Spark最新的SQL查询起点,实质上是SQLContext和HiveContext的组合。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
2.创建DataFrame有三种方式:
(1)通过Spark的数据源进行创建;
val df:DataFrame = spark.read.csv("./temp/aaa.csv")
(2)从一个存在的RDD进行转换;
注意:如果需要RDD与DF或者DS之间操作,那么都需要引入import spark.implicits._ (spark不是包名,而是sparkSession对象的名称)
(3)还可以从Hive Table进行查询返回。
val frame1:DataFrame = spark.table("tablename")
3.代码示例:
(1)添加依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
(2)object HelloWorld {
def main(args: Array[String]) {
//创建SparkConf()并设置App名称
val spark = SparkSession.builder().appName("HelloWorld").master("local[*]").getOrCreate()
//导入隐式转换
import spark.implicits._
//读取本地文件,创建DataFrame
val df =spark.read.json("examples/src/main/resources/people.json")
//打印
df.show()
//DSL风格:查询年龄在21岁以上的
df.filter($"age"> 21).show()
//创建临时表
df.createOrReplaceTempView("persons")
//SQL风格:查询年龄在21岁以上的
spark.sql("SELECT * FROM persons where age > 21").show()
//关闭连接
spark.stop()
}
}
4.Spark SQL数据的加载与保存
加载数据:spark.read.文件格式
保存数据:df.write.文件格式
5.Spark SQL通过JDBC连接MySQL
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
定义JDBC相关参数配置信息
val connectionProperties = new Properties()
connectionProperties.put("user","root")
connectionProperties.put("password","000000")
//使用read.jdbc加载数据
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd","rddtable", connectionProperties)
//使用write.jdbc保存数据
jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql","db", connectionProperties)
Spark SQL相关语法
1.filter(condition):根据字段进行筛选
和where使用条件相同
jdbcDF .filter("id = 1 or c1 = 'b'" ).show()
即:过滤出来满足condition条件的,注意,是满足条件的数据被过滤出来
2.selectExpr:对指定字段进行特殊处理
可以直接对指定字段调用UDF函数,或者指定别名等。即,对传入的字段进行特殊处理,可以转换形式,取别名等。
示例,查询id字段,c3字段取别名time,c4字段四舍五入:
jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show()
ps:众所周知,取别名的时候,可以用as,也可以省略。即:"c3 as time","c3 time"都对
3.col:获取指定字段
只能获取一个字段,返回对象为Column类型。
val idCol = jdbcDF.col(“id”)
4.drop:去除指定字段保留其他字段
返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。
示例:jdbcDF.drop("id")
5.还有就是两个DS或DF进行join,其实最后的结果是两个DS或DF的所有字段
代码:
val spark = SparkSession.builder().appName("JoinDemo").master("local[*]").getOrCreate()
val context=spark.sqlContext
val data1=context.createDataFrame(List(("b","Bob",36))).toDF("id","name","age")
val data2 = context.createDataFrame(List(("staff","Bob",66666))).toDF("job","name","salary")
data1.join(data2,data1.col("name").equalTo(data2.col("name"))).show()
结果:
+---+----+---+-----+----+------+
| id|name|age| job|name|salary|
+---+----+---+-----+----+------+
| b| Bob| 36|staff| Bob| 66666|
+---+----+---+-----+----+------+
网友评论