美文网首页
Spark基础系列之八--Spark SQL是什么

Spark基础系列之八--Spark SQL是什么

作者: 微生活_小阿楠 | 来源:发表于2020-05-22 15:33 被阅读0次

传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门

一、Spark SQL是什么

1)DataFrame简述

  • Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据
  • Spark SQL目前支持Scala、Java、Python三种语言
  • DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能
  • Spark能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询

2)DataFrame的创建

  • 1)可以通过如下语句创建一个SparkSession对象:
scala> import org.apache.spark.sql.SparkSession
scala> val spark1=SparkSession.builder().getOrCreate()

  • 2)在创建DataFrame之前,为了支持RDD转换为DataFrame及后续的SQL操作,需要通过import语句(即import spark1.implicits._)导入相应的包,启动隐式转换。(注意:上面import后面的spark,是你创建的命名)
  • 3)在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:
//读取people.json文件创建DataFrame;
//在读取本地文件或HDFS文件时,要注意给出正确的文件路径;
spark.read.json("people.json")

//读取people.csv文件创建DataFrame;
spark.read.csv("people.csv")

3)DataFrame的保存

  • 可以使用spark.write操作,把一个DataFrame保存不同格式的文件。例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:
df.write.json("people.json")
df.write.parquet("people.parquet")
df.write.csv("people.csv")

  • 下面从示例文件people.json中创建一个DataFrame,然后保存成csv格式文件,代码如下:
scala> val peopleDF=spark.read.format("json").load("file:///usr/lcoal/people.json")

scala> peopleDF.select("name","age").write.format("csv").save("file:///usr/local/newpeople.csv")

  • 一些常用的DataFrame操作

3)从RDD转换得到DataFrame

  • 法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换
  • 法二:使用编程接口,构造一个schema并将其应用在已知的RDD上

练习

利用反射机制和编程方式实现rdd转化dataframe
源文件格式如下:
1::ella::65
2::jack::100
3::bruce::80

  • 法一:利用反射来推断包含特定类型对象的RDD的
  • 在利用反射机制推断RDD时,需要首先定义一个case class,因为,只有case class 才能被Spark隐式地转换为DataFrame
import org.apache.log4j.{Level, Logger}
import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.DatasetHolder
import org.apache.spark.sql.SparkSession

object spark3 {
//定义一个case class
  case class Person(key:Int,name:String,age:Int) 
  def main(args :Array[String]): Unit ={
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val conf = new SparkConf().setAppName("spark0").setMaster("local")
    val sc = new SparkContext(conf)

    val spark=SparkSession.builder().getOrCreate()
//导入包,支持把一个RDD隐式转换为一个DataFrame
    import spark.implicits._

    //val RDD = spark.sparkContext.textFile("src/spark/a.txt")
    val RDD = spark.sparkContext.textFile("file:///usr/local/spark/mycode/dataframe2.txt")
  val personDF=RDD.map(_.split("::")).map(x=>Person(x(0).trim.toInt,x(1),x(2).trim.toInt)).toDF()

    personDF.show()

//必须注册为临时表才能供下面的查询项目
    //personDF.createOrReplaceTempView("people")

//最终生成一个DataFrame,下面是系统执行返回的信息
    //val personRDD = spark.sql("select * from people")

//DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
    //personRDD.map(t=>"key: "+t(0) +" name: "+ t(1) + " age: "+t(2)).show()
  }
}


  • 法二:使用编程接口,构造一个schema并将其应用在已知的RDD上
  • 当无法提前定义case class时,就需要采用编程方式定义RDD模式
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
object spark5 {
  def main(args :Array[String]): Unit = {
    val conf = new SparkConf().setAppName("spark0").setMaster("local")
    val sc = new SparkContext(conf)

    val spark=SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    val fields = Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true))
    val schema = StructType(fields)

    val RDD = spark.sparkContext.textFile("file:///usr/local/spark/mycode/dataframe2.txt")
    val rowRDD = RDD.map(x=>x.split("::")).map(x=>Row(x(0).trim.toInt,x(1),x(2).trim.toInt))

    val personDF = {
      spark.createDataFrame(rowRDD, schema)
    }
    personDF.show()
  }
}


相关文章

网友评论

      本文标题:Spark基础系列之八--Spark SQL是什么

      本文链接:https://www.haomeiwen.com/subject/jsafahtx.html