在IDEA中开发Spark SQL程序
1、指定Schema格式
数据E:/RmDownloads/student.txt
SparkSQLDemo1.scala
//指定schema,创建DataFrame
object SparkSQLDemo1 {
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession.builder().master("local").appName("SparkSQLDemo1").getOrCreate()
//从指定文件中读取数据,生成对应的RDD
val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))
//创建schema,通过StructType
val schema =types.StructType(
List(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
)
)
//将RDD映射到RowRDD行的数据上
val rowRDD = studentRDD.map(student => Row(student(0).toInt, student(1), student(2).toInt))
//生成DataFrame
val studentDF =spark.createDataFrame(rowRDD,schema)
//将DF注册成表/视图
studentDF.createOrReplaceTempView("student")
//执行SQL
val result = spark.sql("select * from student").show()
//释放资源
spark.stop()
}
}
2、使用case class
SparkSQLDemo2.scala
//使用case class
object SparkSQLDemo2 {
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession.builder().master("local").appName("SparkSQLDemo2").getOrCreate()
//从指定文件中读取数据,生成对应的RDD
val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))
//将数据的RDD和case class关联起来
val dataRDD = studentRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))
//生成DataFrame,通过RDD生成DataFrame,导入隐私转换
import spark.sqlContext.implicits._
val studentDF = dataRDD.toDF()
//注册表,视图
studentDF.createOrReplaceTempView("student")
//执行sql
spark.sql("select * from student").show()
//释放资源
spark.stop()
}
}
//定义case class代表schema结构
case class Student(stuID:Int,stuName:String,stuAge:Int)
就数据保存到数据库
SparkSQLDemo3.scala
lib
文件里添加mysql-connector-java-5.1.40-bin.jar
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object SparkSQLDemo3 {
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession.builder().master("local").appName("SparkSQLDemo3").getOrCreate()
//从指定的文件中读取数据,生成对应的RDD
val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))
//创建schema ,通过StructType
val schema = StructType(
List(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
)
)
//将RDD映射到Row RDD 行的数据上
val rowRDD = studentRDD.map(student => Row(student(0).toInt, student(1), student(2).toInt))
//生成DataFrame
val studentDF = spark.createDataFrame(rowRDD,schema)
//将DF注册成表
studentDF.createOrReplaceTempView("students")
//执行SQL
val result = spark.sql("select * from students")
//显示
result.show()
//将结果保存到mysql中
val mysqlprops = new Properties()
// Class.forName("com.mysql.jdbc.Driver")
mysqlprops.setProperty("user","bigdata")
mysqlprops.setProperty("password","123456")
result.write.jdbc("jdbc:mysql://bigdata02:3306/sqoopdb","students",mysqlprops)
//如果表已经存在,append的方式数据
// result.write.mode("append").jdbc("jdbc:mysql://bigdata02:3306/sqoopdb","students",mysqlprops)
//停止spark context
spark.stop()
}
}
网友评论