美文网首页
Spark编程基础(Scala版)——Spark SQL

Spark编程基础(Scala版)——Spark SQL

作者: kaiker | 来源:发表于2021-08-09 19:16 被阅读0次

    1、Spark SQL简介

    Hive执行SQL过程
    • Spark线程级并行,MR是进程级并行
    • Spark SQL仅依赖HiveQL解析和Hive元数据,解析成抽象语法树后就交给Spark SQL


      Spark SQL执行过程

    2、DataFrame

    RDD是分布式的Java对象集合,但是对象内部结构对于RDD是不可知的

    DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息

    RDD与DataFrame

    3、DataFrame创建

    SparkSession支持从不同的数据源加载数据,以及把数据转换成DataFrame

        scala> import org.apache.spark.sql.SparkSession
        scala> val spark=SparkSession.builder().getOrCreate()
    
        scala> val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
        scala> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/sql/newpeople.csv")
    

    4、DataFrame常用操作

    printSchema()

    df.printSchema() 展示结构

    select()

    df.select(df("name").as("xxxname"), df("age")).show()

    filter()

    df.filter(df("age") > 20).show()

    groupBy()

    df.groupBy("age").count().show()

    sort()

    df.sort(df("age").desc).show()

    5、从RDD转换为DataFrame

    5.1 利用反射推断

    • map(attributes => Person(attributes(0), attributes(1).trim.toInt))
    scala> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
    scala> import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Encoder 
    scala> import spark.implicits._  //导入包,支持把一个RDD隐式转换为一个DataFrame
    import spark.implicits._
    scala> case class Person(name: String, age: Long)  //定义一个case class
    defined class Person
    scala> val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).
    map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
    peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] 
    scala> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用
    scala> val personsRDD = spark.sql("select name,age from people where age > 20")
    //最终生成一个DataFrame,下面是系统执行返回的信息
    personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
    scala> personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()
    

    5.2 编程定义

    • 制作表头
    • 制作表中记录
    • 把表头和内容拼在一起
    • SparkSQL提供了StructType(fields:Seq[StructField])类来表示表的模式信息
    • StructField(name, dataType, nullable) name字段名称 dataType字段类型
    • 制作表中记录的时候,每条记录都封装到Row对象中,并把所有的Row对象一起保存到一个RDD中
    • 表头和表中记录可以通过spark.createDataFrame()语句进行拼接并得到一个DataFrame
    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    scala> import org.apache.spark.sql.Row
    import org.apache.spark.sql.Row
    //生成字段
    scala> val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
    fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))
    scala> val schema = StructType(fields)
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age, IntegerType,true))
    //从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
    //shcema就是“表头”
    //下面加载文件生成RDD
    scala> val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26 
    //对peopleRDD 这个RDD中的每一行元素都进行解析
    scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
    rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29
    //上面得到的rowRDD就是“表中的记录”
    //下面把“表头”和“表中的记录”拼装起来
     scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
     //必须注册为临时表才能供下面查询使用
    scala> peopleDF.createOrReplaceTempView("people")
     scala> val results = spark.sql("SELECT name,age FROM people")
    

    6、读写数据库

    6.1 JDBC

    scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
    scala> jdbcDF.show()
    
    studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    

    6.2 Hive

    scala> import org.apache.spark.sql.Row
    scala> import org.apache.spark.sql.SparkSession
    scala> case class Record(key: Int, value: String)
    scala> val warehouseLocation = "spark-warehouse" // 这个是配置的
    scala> val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
    scala> import spark.implicits._
    scala> import spark.sql
    //下面是运行结果
    scala> sql("SELECT * FROM sparktest.student").show()
    
        scala> import java.util.Properties
        scala> import org.apache.spark.sql.types._
        scala> import org.apache.spark.sql.Row 
        //下面设置两条数据表示两个学生信息
        scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
        //下面设置模式信息
        scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
         //下面创建Row对象,每个Row对象都是rowRDD中的一行
        scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
        //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
        scala> val studentDF = spark.createDataFrame(rowRDD, schema)
        //查看studentDF
        scala> studentDF.show()
        +---+---------+------+---+
        | id| name|gender|age|
        +---+---------+------+---+
        | 3|Rongcheng| M| 26|
        | 4| Guanhua| M| 27|
        +---+---------+------+---+
        //下面注册临时表
        scala> studentDF.registerTempTable("tempTable") 
        //下面执行向Hive中插入记录的操作
        scala> sql("insert into sparktest.student select * from tempTable")
    

    相关文章

      网友评论

          本文标题:Spark编程基础(Scala版)——Spark SQL

          本文链接:https://www.haomeiwen.com/subject/ewkovltx.html