美文网首页眼君的大数据之路
spark开发笔记(三、Spark SQL笔记)

spark开发笔记(三、Spark SQL笔记)

作者: 眼君 | 来源:发表于2020-09-02 11:59 被阅读0次

    基本概念

    Shark、Spark SQL和Hive之间的关系:

    1. Shark借用了Hive大部分的组件,包括词法分析、语法分析和逻辑分析阶段,只是在最后将逻辑执行计划转化为物理执行计划这一步,将底层的实现从MapReduce替换成了Spark。
    2. Spark SQL在Hive兼容层面仅依赖HiveQL解析和Hive元数据,也就是说从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。

    Spark SQL增加了DataFrame(即带有Scheme信息的RDD)。使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是Json格式的数据。
    Spark SQL目前支持Scala、Java和Python三种语言。
    DataFrame是以RDD为基础的分布式数据集,提供了详细的结构信息。

    DataFrame的创建

    Spark2.0开始使用全新的SparkSession接口替代Spark1.6中的SQLContext、HiveContext等来实现其对数据加载、转换、处理等功能。
    SparkSession支持从不同的数据源(Hive、HDFS、Cassandra)把数据转换成DataFrame,并支持把DataFrame转换成SQLContext中的表,使用SQL进行查询。
    /spark/examples/src/main/resources目录下自带一些样例数据,我们来进行加载:

    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().getOrCreate()
    //支持RDDs转换为DataFrame及后续SQL
    import spark.implicits._
    val df1 = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
    df1.show()
    val df2 = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
    df2.show()
    

    以下是常用的DataFrame操作:

    //打印模式信息
    scala>df1.printSchema()
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    //选择多列
    scala>df1.select(df("name"),df("age")+1).show()
    +-------+---------+
    |   name|(age + 1)|
    +-------+---------+
    |Michael|     null|
    |   Andy|       31|
    | Justin|       20|
    +-------+---------+
    //条件过滤
    df1.filter(df("age") > 20 ).show()
    +---+----+
    |age|name|
    +---+----+
    | 30|Andy|
    +---+----+
    //分组聚合
    scala>df.groupBy("age").count().show()
    +----+-----+
    | age|count|
    +----+-----+
    |  19|    1|
    |null|    1|
    |  30|    1|
    +----+-----+
    //排序
    scala>df.sort(df("age").desc).show()
    +----+-------+
    | age|   name|
    +----+-------+
    |  30|   Andy|
    |  19| Justin|
    |null|Michael|
    +----+-------+
    //多列排序
    scala>df.sort(df("age").desc,df("name").desc).show()
    +----+-------+
    | age|   name|
    +----+-------+
    |  30|   Andy|
    |  19| Justin|
    |null|Michael|
    +----+-------+
    //对列进行重命名
    scala>df.select(df("name").as("username"),df("age")).show()
    +--------+----+
    |username| age|
    +--------+----+
    | Michael|null|
    |    Andy|  30|
    |  Justin|  19|
    +--------+----+
    

    从RDD转换得到DataFrame

    1. 利用反射机制推断RDD模式
      利用反射机制推断RDD模式时,需要首先定义一个case class,因为只有case class才能被Spark隐式转换为DataFrame:
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._     //导入包,支持把RDD隐式转换为DataFrame
    case class Person(name:String,age:Long)     //定义一个case class
    val peopleDF = spark.sparkContext.textFile("file:///usr/local/people.txt").map(_.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF()
    peopleDF.createOrReplaceTempView("people")      //必须注册为临时表才能提供下面的查询使用
    val personsRDD = spark.sql("select name,age from people where age > 20")    //最终生成一个Dataframe
    personsRDD.map(t => "Name:" + t(0)+","+"Age:"+t(1)).show() //df中每一个元素都是一行记录
    
    1. 使用编程方式定义RDD模式
      当无法提取定义case class时,就需要采用编程方式定义RDD模式:
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    val peopleRDD = spark.sparkContext.textFile("file:///usr/local/people.txt")
    val schemaString = "name age" //定义一个模式字符串
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
    val schema = StructType(fields) 
    val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim))
    val peopleDF = spark.createDataFrame(rowRDD,schema)
    
    

    DataFrame保存成文件

    1. write.format().save()
    val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
    peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/newpeople.csv")
    

    write.format()支持json,parquet,jdbc,orc,libsvm,csv,text等格式文件。

    1. saveAsTextFile()
    val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
    peopleDF.saveAsTextFile("file:///usr/local/spark/newpeople.csv")
    

    通过JDBC连接数据库

    假设MySQL中创建了一个表spark.student(id int(4),name char(20),gender char(4),age int(4));
    要让spark能连接mysql,需要将mysql的驱动包(mysql-connector-java-xx.jar)拷贝到spark/jars/目录下,启动spark-shell时需要指定该驱动包:

    spark-shell --jars /usr/local/spark/jars/mysql-connector-java-xx.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-xx.jar
    

    接下来开始编写程序:

    val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://ocalhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable","student").option("user","root").option("password","hadoop").load()
    jdbcDF.show()
    
    向MySQL数据库写入数据

    现在开始在spark-shell中编写程序,往spark.student表中插入两条记录:

    import java.util.Properties
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    //下面设置两条数据表示两个学生信息
    val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
    
    //下面要设置模式信息
    val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true)))
    
    //下面创建Row对象,每个Row对象都是 rowRDD中一行
    val rowRDD = studentRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
    
    //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
    val studentDF = spark.createDataFrame(rowRDD,schema)
    
    //下面创建一个prop变量用来保存JDBC连接参数
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","hadoop")
    prop.put("driver","com.mysql.jdbc.Driver")
    
    //采用append模式,追加数据
    studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    

    相关文章

      网友评论

        本文标题:spark开发笔记(三、Spark SQL笔记)

        本文链接:https://www.haomeiwen.com/subject/whgvsktx.html