美文网首页
Spark系列3 - Spark SQL

Spark系列3 - Spark SQL

作者: georgeguo | 来源:发表于2018-09-23 14:53 被阅读125次

    1 从Shark到Spark SQL

    Spark SQL的前生是Shark,即Hive on Spark。Shark本质是通过Hive的HQL进行解析,将HiveQL翻译成Spark上对应的RDD操作,然后通过Hive的Metadata获取数据数据库里的元数据,并根据元数据从HDFS上读取文件,最后由Shark将获取的数据放到Spark上运算。

    Shark提供了类似Hive的功能,区别是Hive将输入的HiveQL转换成MapReduce作业,而Shark将HiveQL转换成Spark作业。Shark复用了Hive中的HiveQL解析、逻辑执行计划翻译、执行计划优化等,可以近似的认为,Shark仅将物理计划从MapReduce作业替换成Spark作业。

    Shark继承了大量的Hive代码,给优化和维护带来了很大的不变,特别是基于MapReduce设计的部分,成为整个项目的瓶颈,因此2014年7月,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Spark SQL允许开发人员可以直接操作RDD,同时也可查询在Hive上存放的外部数据,因此Spark SQL在使用SQL进行外部查询的同时,也能进行更复杂的数据分析。Hive、Shark和Spark SQL的架构对比下图,

    Hive、Shark和Spark SQL架构对比

    Spark SQL在Shark原有架构的基础上,重写了逻辑执行计划的优化部分,解决了Shark存在的问题。Spark SQL引入了DataFrame,用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以来自Hive、HDFS、Cassandra等。

    2 RDD和DataFrame

    RDD是分布式的Java对象的集合,对象的内部结构对于RDD而言却是不可知的。DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息,DataFrame是带有schema的RDD。二者的对比如下图,

    RDD和DataFrame

    Spark2.0之后,Spark使用了全新的SparkSession,替换了Spark1.6中的SQLContext和HiveContext。

    3 DataFrame编程

    DataFrame的创建

    import org.apache.spark.sql.SparkSession // spark-shell中默认已导入
    

    读取json格式的数据,创建DataFrame

    import spark.implicits._
    val df = spark.read.json("hdfs://master:8020/data/people.json")
    df.show()
    val df1 = spark.read.format("json").load("hdfs://master:8020/data/people.json")
    df1.show()
    

    DataFrame的保存

    val df = spark.read.json("hdfs://master:8020/data/people.json")
    df.write.json("hdfs://master:8020/data/people_new.json")
    df.write.parquet("hdfs://master:8020/data/people_new.parquet")
    df.write.csv("hdfs://master:8020/data/people_new.csv")
    
    df.write.format("json").save("hdfs://master:8020/data/people_new1.json")
    df.write.format("parquet").save("hdfs://master:8020/data/people_new1.parquet")
    df.write.format("csv").save("hdfs://master:8020/data/people_new1.csv")
    

    注意:save保存文件时对应的参数是一个路径而不是文件名。

    DataFrame的常用操作

    df.printSchema() 打印DataFrame的模式
    df.select() 从DataFrame中选择部分字段

    df.select("name","Age").show()
    df.select(df("name"), df("Age")+1).show() // 修改字段值
    df.select(df("name").as("username"), df("Age").as("userage")).show() // 给字段重命名
    

    df.filter() 实现条件查询,获取满足条件的记录

    df.filter(df("age") > 20).show()
    

    df.groupBy() 对记录进行分组

    df.groupBy("age").count().show()
    

    df.sort() 对记录进行排序

    df.sort(df("name").asc).show
    df.sort(df("age").desc).show
    df.sort(df("age").desc, df("name")).show
    

    df.map(func) 对记录安装func进行转换

    从RDD转换到DataFrame

    注意:这里的转换主要指无格式的文本文件、MySQL、Hive等无法直接生成DataFrame的数据进行转换,对于json、csv、parquet格式的文件,会自动的进行隐式转换,不需要进行这样的转换。从RDD转换到DataFrame有两种转换方法:
    方法1:利用反射机制推断RDD模式,适用于对已知数据结构的转换;
    方法2:使用编程方式定义RDD模式,使用编程结构构造一个schema,并将其应用的已知的RDD上;

    利用反射机制推断RDD

    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.Encoder
    import spark.implicits._
    
    case class Person(name: String, age: Long)
    val people = spark.sparkContext.textFile("hdfs://master:8020/data/people.txt")
    val peopleDf = people.map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
    peopleDf.createOrReplaceTempView("people") //必须转成临时表才能供查询使用
    val peopleRDD = spark.sql("select name,age from people where age > 20")
    peopleRDD.map(t => {"Name: "+t(0) +", age: "+t(1)}).show()
    
    • 上述代码中定义了一个case class,只有case class才能被Spark隐式转换成DataFrame,因此定义了一个cass class;
    • 生成DataFrame之后,Spark要求必须把DataFrame注册为临时表,才能供后面的查询使用,上述临时表的名称为people;

    采用编程方式定义RDD模式

    当无法定义case class时,就需要采用编程方式定义RDD模式,具体需要3步:

    • 第一步:制作表头,表头也就是表的模式,包括字段名,类型和是否允许为空等信息
    • 第二步:制作表中记录;
    • 第三步:把表头和表中记录拼装在一起;
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    // generate Field
    val fields = Array(StructField("name", StringType, true), StructField("age", IntegerType, true)) 
    val schema = StructType(fields)
    
    // generate Record
    val peopleRDD = spark.sparkContext.textFile("hdfs://master:8020/data/people.txt")
    val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
    
    // combine
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    
    peopleDf.createOrReplaceTempView("people") //必须转成临时表才能供查询使用
    val peopleRecord = spark.sql("select name,age from people")
    peopleRecord.show()
    

    4 Spark SQL读写MySQL

    通过JDBC连接数据库

    ① 在mysql库中创建表,并插入数据
    ② 拷贝mysql-connector-java-5.1.40-bin.jar 到各个spark节点($SPAK_HOME/jars)
    ③ 启动spark-shell
    spark-shell --jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.40-bin.jar
    --driver-class-path /root/software/spark-2.2.0-bin-hadoop2.6/jars/mysql-connector-java-5.1.40-bin.jar

    说明:若环境变量配置完整,启动spark-shell不带上述参数,也是可以直接使用JDBC的。本文在实验的过程中,启动spark-shell未带任何参数。

    从MySQL中读取数据

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object ReadMySql {
        def main(args: Array[String]) = {
            val spark = SparkSession.builder().getOrCreate()
            val jdbcDF = spark.read.format("jdbc").
                option("url", "jdbc:mysql://192.168.2.180:3306/spark").
                option("driver", "com.mysql.jdbc.Driver").
                option("dbtable","student").
                option("user","root").
                option("password", "hunter").
                load()
            jdbcDF.show()
        }
    }
    

    向MySQL中写数据(在spark-shell中交互操作)

    import java.util.Properties
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    object InsertMySql {
        def main(args: Array[String]) = {
            val spark = SparkSession.builder().getOrCreate()
            val studentRDD = spark.sparkContext.parallelize(Array("7,TianQiQi,M,30","8,GouDan,M,18")).map(_.split(","))
            val fields = Array(
                StructField("id", IntegerType, true),
                StructField("name", StringType, true), 
                StructField("gender", StringType, true), 
                StructField("age", IntegerType, true)) 
            val schema = StructType(fields)
            
            val rowRDD = studentRDD.map(p => {Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt)})
            val studentDF = spark.createDataFrame(rowRDD, schema)
            
            val prop = new Properties()
            prop.put("user","root")
            prop.put("password", "hunter")
            prop.put("driver", "com.mysql.jdbc.Driver")
            
            studentDF.write.mode("append").jdbc("jdbc:mysql://192.168.2.180:3306/spark", "spark.student", prop)
        }
    }
    

    5 SparkSQL 读写 Hive

    Hive的部署和安装参考:apache-hive-1.2.2安装

    初始配置

    检查当前的Spark版本是否包含Hive支持,在spark-shell交互界面输入:

    import org.apache.spark.sql.hive.HiveContext
    

    若引入不报错,则说明当前的spark已经支持Hive。
    在Hive中创建库和表并插入记录

    create database if not exists sparkhive;
    show databases;
    
    use sparkhive;
    
    create table if not exists sparkhive.student(
        id int, name string, gender string, age int
    );
    
    insert into student values(1, "ZhangSan", "F", 23);
    insert into student values(2, "LiSi", "M", 24);
    
    select * from student;
    

    设置Spark的环境变量。为了让Spark能够顺利的访问Hive,需要在spark-env.sh中添加如下变量:

    export HIVE_HOME=/root/software/apache-hive-1.2.1-bin
    export HIVE_CONF_DIR=$HIVE_HOME/conf
    export SPARK_CLASSPATH=$HBASE_HOME/lib:$HIVE_HOME/lib/mysql-connector-java-5.1.40-bin.jar
    

    其他节点分发:

    cd $HBASE_HOME/conf 
    scp hbase-env.sh slave1:/root/software/hbase-1.2.1/conf
    scp hbase-env.sh slave2:/root/software/hbase-1.2.1/conf
    scp hbase-env.sh slave3:/root/software/hbase-1.2.1/conf
    
    cd $HIVE_HOME
    scp -r lib/mysql-connector-java-5.1.40-bin.jar conf/ slave1:$HIVE_HOME
    scp -r lib/mysql-connector-java-5.1.40-bin.jar conf/ slave2:$HIVE_HOME
    scp -r lib/mysql-connector-java-5.1.40-bin.jar conf/ slave3:$HIVE_HOME
    

    拷贝hive-site.xml到$SPARK_HOME/conf目录下

    cp $HIVE_HOME/conf/hive-site.xml   $SPARK_HOME/conf
    

    注意:如果不将hive-site.xml拷贝到spark的conf目录下,会出现找不到表的异常。

    从hive中读数据

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object ReadHive {
        def main(args: Array[String]) = {
            val warehouseLocation = "spark-warehouse"
            val spark = SparkSession.builder().
                appName("Spark Hive Example").
                config("spark.sql.warehouse.dir", warehouseLocation).
                enableHiveSupport().getOrCreate()
                
            import spark.implicits._
            import spark.sql
            sql("select * from sparkhive.student").show()
        }
    }
    

    向hive中写数据

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    object InsertHive {
        def main(args: Array[String]) = {
            val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
            import spark.sql
            import spark.implicits._
            
            val studentRDD = spark.sparkContext.parallelize(Array("30,TianQi,M,30","40,ErGouZi,F,18")).map(_.split(","))
            val fields = Array(
                StructField("id", IntegerType, true),
                StructField("name", StringType, true), 
                StructField("gender", StringType, true), 
                StructField("age", IntegerType, true)) 
            val schema = StructType(fields)
            
            val rowRDD = studentRDD.map(p => {Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt)})
            
            val studentDF = spark.createDataFrame(rowRDD, schema)
            
            studentDF.registerTempTable("tempTable")
            
            sql("insert into sparkhive.student select * from tempTable")
        }
    }
    

    错误和异常的解决方法

    yarn cluster模式使用SQL找不到表 报错:
    org.apache.spark.sql.AnalysisException: Table or view not found:
    at org.apache.spark.sql.catalyst.analysis.packageAnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.AnalyzerResolveRelations$.getTable(Analyzer.scala:306)

    解决方法:
    1 Using SparkSession.enableHiveSupport() instead of deprecated SQLContext or HiveContext.
    2 copy hive-site.xml into SPARK CONF (/usr/lib/spark/conf) directory
    3 Adding the same directory to the classpath while executing the jar

    相关文章

      网友评论

          本文标题:Spark系列3 - Spark SQL

          本文链接:https://www.haomeiwen.com/subject/rnrxoftx.html