Hive平滑过渡到Spark Sql

作者: 董二弯 | 来源:发表于2019-05-13 19:43 被阅读1次

    Hive概述

    Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,将类 SQL 语句转换为 MapReduce 任务执行。


    image.png

    Hive产生背景

    • MapReduce编程带来的不便性
      MapReduce编程十分繁琐,在大多情况下,每个MapReduce程序需要包含Mapper、Reduceer和一个Driver,之后需要打成jar包扔到集群上运 行。如果mr写完之后,且该项目已经上线,一旦业务逻辑发生了改变,可能就会带来大规模的改动代码,然后重新打包,发布,非常麻烦(这种方式,也是最古老的方式)

    • 当大量数据都存放在HDFS上,如何快速的对HDFS上的文件进行统计分析操作?
      一般来说,想要做会有两种方式:
      学Java、学MapReduce(十分麻烦)
      做DBA的:写SQL(希望能通过写SQL这样的方式来实现,这种方式较好)
      然而,HDFS中最关键的一点就是,数据存储HDFS上是没有schema的概念的
      (schema:相当于表里面有列、字段、字段名称、字段与字段之间的分隔符等,这些就是schema信息) ,然而HDFS上的仅仅只是一个纯的文本文件而已 。那么,没有schema,就没办法使用sql进行查询 。因此,在这种背景下,就有问题产生: 如何为HDFS上的文件添加Schema信息?如果加上去,是否就可以通过SQL的方式进行处理了呢?在这种背景下,Hive产生了。

    Hive整体架构

    image.png

    Hive架构包括如下组件:CLI(command line interface)、JDBC/ODBC、Thrift Server、Hive WEB Interface(HWI)、metastore和Driver(Complier、Optimizer和Executor)

    • Driver组件:核心组件,整个Hive的核心,该组件包括Complier、Optimizer和Executor,它的作用是将我们写的HQL语句进行解析、编译优化,生成执行计划,然后调用底层的MapReduce计算框架。
    • Metastore组件:元数据服务组件,这个组件存储hive的元数据,hive的元数据存储在关系数据库里,hive支持的关系数据库有derby、mysql。
    • CLI:command line interface,命令行接口。
    • ThriftServers:提供JDBC和ODBC接入的能力,它用来进行可扩展且跨语言的服务的开发,hive集成了该服务,能让不同的编程语言调用hive的接口。
    • Hive WEB Interface(HWI):hive客户端提供了一种通过网页的方式访问hive所提供的服务。这个接口对应hive的hwi组件(hive web interface)

    执行流程示意图

    image.png

    Hive 将通过CLI接入,JDBC/ODBC接入,或者HWI接入的相关查询,通过Driver(Complier、Optimizer和Executor),进行编译,分析优化,最后变成可执行的MapReduce。
    Hive 功能有点类似传统的数据库引擎(如mysql),解析器,预处理器,优化器,查询执行计划这些功能的汇总。只不过Hive是将HQL转换成MapReduce,而传统的数据库引擎将SQL转换成执行引擎可以识别的语言

    Hive环境搭建

    Hadoop环境在大数据入门章节https://www.jianshu.com/p/10700514e3e0
    中已经讲述,这里直接使用该环境。

    • 把Hive的安装压缩包hive-1.1.0-cdh5.7.0.tar.gz上传到服务器。
    • 解压并配置环境变量
    tar -zxvf hive-1.1.0-cdh5.7.0.tar.gz -C ~/apps/
    cd ~/app
    vi  ~/.bash_profile
         //在文件中添加变量
        export HIVE_HOME=/root/apps/hive-1.1.0-cdh5.7.0
        export PATH=$HIVE_HOME/bin:$PATH
    // 使环境变量生效
    source ~/.bash_profile
    
    • 为hive配置关系型数据库地址信息,用于存储hive的元数据。本人用的是MySQL,此处省略了MySQL的安装。在配置之前一定要安装,安装在本地和虚拟机都行,只要网络通,能访问即可。
    cd /root/apps/hive-1.1.0-cdh5.7.0/conf
    vi hive-site.xml
     //在文件中配置以下信息
    
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    
    <configuration>
    <!--mysql数据库地址-->
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://192.168.30.130:3306/sparksql?createDatabaseIfNotExist=true</value>
        </property>
    <!-- mysql的driver类 -->
        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.jdbc.Driver</value>
        </property>
    <!-- 用户名 -->
      <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
        </property>
    <!-- 密码 -->
       <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>191016</value>
        </property>
    </configuration>
    

    配置完成后需要拷贝mysql驱动到$HIVE_HOME/lib/ ,下载地址https://dev.mysql.com/downloads/connector/j/

    • 在hive-env.sh中配置hadoop的安装路径
    cd /root/apps/hive-1.1.0-cdh5.7.0/conf
    //默认只提供hive-env的模板,需要复制一份
    cp hive-env.sh.template hive-env.sh
    vi hive-env.sh
       在文件中找到HADOOP_HOME
       配置为本地安装的hadoop安装路径
    
    • 启动hive,在启动之前需要启动hadoop环境
    cd /root/apps/hive-1.1.0-cdh5.7.0/bin
    ./hive
    

    启动之后可以登录到mysql查看hive是否创建了元数据信息表。


    image.png

    Hive的简单使用

    用hive来对wordcount案例的实现

    • 创建表
    //表名称为hive_wordcount,有一个string类型的字段context
    create table hive_wordcount(context string);
    

    创建后可在mysql的TBLS元数据表中看到新建的表信息


    image.png
    • 加载需要统计的文件到hive_wordcount表
    //load data local inpath 'filepath ' into table 'tablename'
    load data local inpath '/root/data/hello.txt' into table hive_wordcount
    
    • 查询统计词频出现的次数
    //lateral view explode():是把每行记录按照指定分隔符进行拆解,我的文本字符之间是以空格做分隔
    select word,count(1) from hive_wordcount lateral view explode(split(context,' ')) as word group by word
    

    执行后会生成MapReduce作业在yarn上执行。运行结束后,可统计出每个单词出现的次数。

    Hive on Spark

    Hive默认使用MapReduce作为执行引擎,即Hive on mr。实际上,Hive还可以使用Tez和Spark作为其执行引擎,分别为Hive on Tez和Hive on Spark。由于MapReduce中间计算均需要写入磁盘,而Spark是放在内存中,所以总体来讲Spark比MapReduce快很多。hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

    SparkSQL概述

    什么是SparkSQL

    Spark SQL是Spark用来处理结构化数据的一个模块,它提供了两个编程抽象分别叫做DataFrame和DataSet,它们用于作为分布式SQL查询引擎。从下图可以查看RDD、DataFrames与DataSet的关系。

    image

    为什么引入SparkSQL

    在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应用而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是,MapReduce在计算过程中大量的中间磁盘落地过程消耗了大量的磁盘I/O,降低了运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现突出的有一个叫做Shark的工具。Shark运行在Spark引擎上,从而使得SQL的查询速度得到了10-100倍的提升。但是,随着Spark的发展,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等),制约了Spark的既定方针,和各个组件的相互集成,所以才有了SparkSQL。

    SparkSQL与Hive on Spark

    SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。
    根据发展历程和和spark的集成程度考虑,用SparkSQL略好于Hive on Spark。所以重点介绍SparkSQL的知识点。

    Hive平滑过渡到Spark Sql

    SQLContext/HiveContext/SparkSession的使用

    在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
      SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
    这里我们简单介绍SparkSession的使用。

    • 创建SparkSession
    val sparkSession = SparkSession.builder.master("local").appName("spark session example").getOrCreate()
    

    上面代码类似于创建一个SparkContext,master设置为local,然后创建了一个SQLContext封装它。如果你想创建hiveContext,可以使用下面的方法来创建SparkSession,以使得它支持Hive:

    val sparkSession = SparkSession.builder.master("local").appName("spark session example").enableHiveSupport().getOrCreate()
    

    enableHiveSupport 函数的调用使得SparkSession支持hive,类似于HiveContext。

    • 使用SparkSession读取数据
      创建完SparkSession之后,我们就可以使用它来读取数据,下面代码片段是使用SparkSession来从csv文件中读取数据。读取数据后为一个DataFrame对象,可通过DataFrame的函数对数据进行过滤等操作(在后续介绍DataFrame时介绍)。
    val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")
    

    spark-shell/spark-sql的使用

    • 进入spark/bin下启动spark-shell,成功后运行查询表sql。
    spark-shell --master local[2]
    spark.sql("show tables").show()
    

    会发现无法访问hive表的数据。此时需要为spark配置hive-site的信息。

    • 将hive/conf目录下的hive-site.xml文件拷贝到spark/conf目录下(且添加参数“hive.metastore.schema.verification”的值为“true”,这样做的目的是使得进入spark客户端时不报版本不匹配错误;但是不添加也是可以正常运行的)。由于需要从mysql中访问hive的元数据信息,所以启动时需要指定mysql的连接jar包。启动成功后运行查询表sql
    //此处jars后面的参数是mysql的jar包所在的路径
    spark-shell --master local[2] --jars jar/mysql-connector-java-5.1.27-bin.jar
    spark.sql("show tables").show()
    

    此时可以发现可以访问到测试hive时所建的hive_wordcount表。

    鉴于在spark-shell中每一次使用都需要调用spark.sql方法,故可以使用以下命令打开spark客户端:

    spark-sql --master local[2] --jars jar/mysql-connector-java-5.1.27-bin.jar
    

    这样即可在客户端直接使用sql代码。

    thriftserver/beeline的使用

    基于Spark的thirftserver来访问hive中的数据,可以让多个客户端连接到同一个服务器端,跑的是同一个application。Thirftserver作为服务端,beeline作为客户端来访问服务端,支持多个客户端同时访问,有助于多个客户端之间数据的共享。而spark-shell、spark-sql启动都是一个spark application,不能共享数据。

    • 首先是启动thriftserver服务端:
      服务器端是在spark目录下的sbin目录下,但是启动的时候不能直接使用./start-thriftserver.sh进行启动,会报没有设置master, 另外就是Spark SQL是需要和mysql一样操作表的,所以需要连接mysql的驱动jar,因此命令如下:
    ./start-thriftserver.sh --master local[2] --jars ~/lib/mysql-connector-java-5.1.38.jar
    

    启动完成之后可以在浏览器中进行查看,是否启动成功;在浏览器中输入以下地址即可.(阿里云服务器需要开放安全组端口)
    ip:4040

    • 启动beeline客户端进行数据的操作:
      启动程序在bin目录下,只需要输入以下命令就会连接到数据库:
    ./beeline -u jdbc:hive2://localhost:10000 -n hadoop
    

    如图,表示连接成功


    image.png
    • 出现上面成功界面后,下面就是操作数据库的操作,和mysql中类似;


      image.png
    • 启动thriftserver: 默认端口是10000 ,可在启动时通过配置修改端口号。
    ./start-thriftserver.sh  --master local[2] --jars ~/lib/mysql-connector-java-5.1.27-bin.jar  --hiveconf hive.server2.thrift.port=14000
    

    jdbc方式编程访问

    在使用jdbc开发时,一定要先启动thriftserver。

    • 在maven中添加hive-jdbc依赖
    <properties>
            <maven.compiler.source>1.5</maven.compiler.source>
            <maven.compiler.target>1.5</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.1.0</spark.version>
     </properties>
    
    <!--scala-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
     <!--SparkSQL-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>    
           </dependency>
    
     <!--Hive-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
    <!--HiveJdbc-->
            <dependency>
                <groupId>org.spark-project.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>1.2.1.spark2</version>
           </dependency>
    
    • 开发代码,和标准的JDBC编程一样。
    /**
     *  通过JDBC的方式访问
     */
    object SparkSQLThriftServerApp {
    
      def main(args: Array[String]) {
        //创建数据库的连接
        Class.forName("org.apache.hive.jdbc.HiveDriver")
        //获取JDBC连接
        val conn = DriverManager.getConnection("jdbc:hive2://192.168.30.130:10000","root","")
        // 创建一个Statement
        val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
        //  执行sql语句。得到结果集
        val rs = pstmt.executeQuery()
        //操作结果集
        while (rs.next()) {
          println("empno:" + rs.getInt("empno") +
            " , ename:" + rs.getString("ename") +
            " , sal:" + rs.getDouble("sal"))
        }
        //关闭JDBC对象
        rs.close()
        pstmt.close()
        conn.close()
      }
    }
    

    DataFrame&Dataset

    DataFrame概述

    DataFrame是一个分布式数据集,可以理解为关系型数据库一张表,由字段和字段类型、字段值按列组织

    DataFrame对比RDD

    • 两个都是分布式数据容器,DF理解是一个表格除了RDD数据以外还有Schema(数据结构信息),也支持复杂数据类型(map..)。
    • DataFrame提供的API比RDD丰富 支持map filter flatMap .....。
    • DF提供Schema信息 有利于优化,性能上好。
    • 底层 :Java/Scala 操作RDD的底层是跑在JVM上的,而python操作RDD的底层不跑在JVM上,它有Python Execution,这就导致了所运行的效率完全是不一样的。而DF不是直接到运行环境的,中间还有一层是logicplan,先转换成逻辑执行计划之后,再去进行运行的;所以不管采用什么语言,它的执行效率都是一样的。

    DataFrame基本API常用操作

    /**
     * DataFrame API基本操作
     */
    object DataFrameApp {
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()
    
        // 将json文件加载成一个dataframe
        val peopleDF = spark.read.format("json").load("file:///Users/rocky/data/people.json")
    
        // 输出dataframe对应的schema信息
        peopleDF.printSchema()
    
        // 输出数据集的前20条记录
        peopleDF.show()
    
        //查询某列所有的数据: select name from table
        peopleDF.select("name").show()
    
        // 查询某几列所有的数据,并对列进行计算: select name, age+10 as age2 from table
        peopleDF.select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2")).show()
    
        //根据某一列的值进行过滤: select * from table where age>19
        peopleDF.filter(peopleDF.col("age") > 19).show()
    
        //根据某一列进行分组,然后再进行聚合操作: select age,count(1) from table group by age
        peopleDF.groupBy("age").count().show()
    
        spark.stop()
      }
    
    }
    

    DataFrame与RDD互操作

    • 反射方式
    //添加隐式转换
    import spark.implicits._
    val spark = SparkSession.builder().appName("xxx").master("local[2]").getOrCreate()
    
    //创建一个RDD
    val rdd = spark.sparkContext.textFile("xxx")
    //转换为DF
    val peopleDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt,line(1),line(2).toInt)).toDF()
    
    case class Info(id: Int, name: String, age: Int)
    
    • 编程方式
      使用编程接口,构造一个schema并将其应用在已知的RDD上。
    val spark = SparkSession.builder().appName("xxx").master("local[2]").getOrCreate()
    //创建一个RDD
    val rdd = spark.sparkContext.textFile("xxx")
    
    //转换RDD的record为Row
    val infoRDD = rdd.map(_.split(",").map(line => Row(line(0).toInt, line(1),line(2).toInt))
    
    //创建一个schema
    val structType = StructType(Array(StructField("id",IntergerType,true),StructField("name",StringType,true),StructField("age",IntergerType,true)))
    
    //将schema应用于RDD,转换为DF
    val infoDF = spark.createDataFrame(infoRDD,structType)
    

    DataFrame和RDD互操作的两种方式

    • 反射:case class 前提:事先知道你的字段、字段类型
    • 编程:Row 事先不知道列
    • 选型:优先第一种

    Dataset概述

    从Spark2.0开始,Spark整合了Dataset和DataFrame,前者是有明确类型的数据集,后者是无明确类型的数据集。
    DataFrame也可以叫Dataset[Row],dataframe每一行的类型是Row(不解析的话无法得知每一行的字段名和对应的字段类型)
    拿出dataframe行中特定字段的方法有两个:
    getAS方法

    testDF.foreach{
      line =>
        val col1=line.getAs[String]("col1")
        val col2=line.getAs[String]("col2")
    }
    

    模式匹配

    testDF.map{
          case Row(col1:String,col2:Int)=>
            println(col1);println(col2)
            col1
          case _=>
            ""
        }
    

    Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息(可以定义字段名和类型)

    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    /**
          rdd
          ("a", 1)
          ("b", 1)
          ("a", 1)
          * */
    val test: Dataset[Coltest]=rdd.map{line=>
          Coltest(line._1,line._2)
        }.toDS
    test.map{
          line=>
            println(line.col1)
            println(line.col2)
        }
    

    可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题。

    Dataset和RDD/DataFrame的转换

    • RDD转Dataset
    import spark.implicits._
    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    val testDS = rdd.map {line=>
          Coltest(line._1,line._2)
        }.toDS
    

    可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

    • Dataset转DataFrame
    import spark.implicits._
    val testDF = testDS.toDF
    
    • DataFrame转Dataset
    import spark.implicits._
    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    val testDS = testDF.as[Coltest]
    

    这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

    Spark Sql读取/写入外部数据源

    • 操作Parquet文件数据
      parquet是默认的格式。从一个parquet文件读取数据的代码如下:
    val usersDF = spark.read.load("src/main/resources/users.parquet")
    

    spark.read返回DataFrameReader对象,其load方法加载文件中的数据并返回DataFrame对象

    将一个DataFrame写到parquet文件

    usersDF.write.save("output/parquet/")
    

    DataFrame#write()方法返回DataFrameWriter对象实例,save方法将数据持久化为parquet格式的文件。save的参数是一个目录,而且要求最底层的目录是不存在的。

    另外一种写的方式是:

    peopleDF.write.parquet("output/parquet/")
    
    • 操作CSV文件
    spark.read.option("header", true).format("csv").load("output/csv/")
    另外一种简化的读法:
    spark.read.option("header", true).csv("output/csv/")
    

    其中的option("header", true)就是告诉读入器这个文件是有表头的。

    将DataFrame写入到csv文件时也需要注意表头,将表头也写入文件的方式:

    peopleDF.write.option("header", true).format("csv").save("output/csv/")
    

    不写表头,只写数据的方式:

    peopleDF.write.format("csv").save("output/csv/")
    

    另外一种简化的写法是:

    peopleDF.write.csv("output/csv/")
    
    • 操作JSON
    val peopleDF = spark.read.format("json").load(path)
    还有一种简化的方式,其本质还是上述的代码:
    val peopleDF = spark.read.json(path)
    

    将一个DataFrame写到json文件的方式:

    peopleDF.write.format("json").save("output/json/")
    另外一种简略的写法:
    peopleDF.write.json("output/json/")
    
    • 操作Hive表数据
    // 加载Hive表数据
        val hiveDF = spark.table("emp")
    

    将DataFrame的数据写入表

    tableDF.write.saveAsTable("src_bak")
    如果要写入一张已经存在的表,需要按照下面的方式:
    tableDF.write.mode(SaveMode.Append).saveAsTable("src_bak")
    
    • 操作MySQL表数据
      spark可以直接通过jdbc读取关系型数据库中指定的表。有两种读取的方式,一种是将所有的参数都作为option一条条设置:
    val url = "jdbc:mysql://localhost:3306/sparksql?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"
    
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "vulcanus_ljl.data_dict")
      .option("user", "vulcanus_ljl")
      .option("password", "mypassword")
      .load()
    

    另一种是预先将参数封装到Properties对象里:

    val url = "jdbc:mysql://localhost:3306/vulcanus_ljl?autoReconnect=true&createDatabaseIfNotExist=true&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true"
    
    val connectionProperties = new Properties()
    connectionProperties.put("user", "vulcanus_ljl")
    connectionProperties.put("password", "mypassword")
    
    val jdbcDF2 = spark.read
      .jdbc(url, "vulcanus_ljl.data_dict", connectionProperties)
    

    spark还可以通过jdbc将DataFrame写入到一张新表(表必须不存在),写入的方式同样分为两种:

    jdbcDF.write
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "vulcanus_ljl.data_dict_temp1")
      .option("user", "vulcanus_ljl")
      .option("password", "mypassword")
      .option("createTableColumnTypes", "dict_name varchar(60), dict_type varchar(60)") // 没有指定的字段使用默认的类型
      .save()
    和
    jdbcDF2.write
      .jdbc(url, "vulcanus_ljl.data_dict_temp2", connectionProperties)
    其中,url和connectionProperties的内容同上文读取时的设置。
    写入时可以通过createTableColumnTypes设置指定多个字段的类型,其他没有指定的字段会使用默认的类型。
    
    • 综合案例,从hive中读出员工表,在MySQL中读出部门表,在查询员工表中部门编号在部门表中的员工。
    /**
     * 使用外部数据源综合查询Hive和MySQL的表数据
     */
    object HiveMySQLApp {
    
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("HiveMySQLApp")
          .master("local[2]").getOrCreate()
    
        // 加载Hive表数据
        val hiveDF = spark.table("emp")
    
        // 加载MySQL表数据
        val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
    
        // JOIN
        val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
        resultDF.show
    
        resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
          mysqlDF.col("deptno"), mysqlDF.col("dname")).show
    
        spark.stop()
      }
    }
    

    Spark Sql愿景

    • 写更少的代码
      -- 从wordcount角度看:
      MapReduce(代码量最多)--->hive(代码量少)---->Spark core(代码量更少,但可读性差)----->Spark SQL(代码量少,可读性好,性能更好)
      -- 从外部数据源角度看:
      为文件输入输出提供了访问的接口
      -- 从schema推导的角度来看:
      可以自动推导数据类型,对于数据类型不对的数据,很方便转换,即数据兼容性更好

    • 读更少的数据
      分区、压缩、pushdown、谓词下压、过滤

    • 将优化交给底层
      底层优化器自动优化程序,即使是小白也能写出高效的代码。

    Spark Sql程序优化项

    • 存储格式的选择
      采取行式还是列式存储?
      列存储写入时次数多,损耗时间多
      反过来查询的时候较快

    • 压缩格式的选择
      考虑压缩速度和压缩文件的分割性
      压缩能够较少存储空间、提高数据传输速度
      Spark中默认的压缩格式是“snappy”

    • 代码的优化:
      选择的高性能的算子:
      foreachPartition => partitionOfRecords.foreach 获得每一条数据
      分区的好处是把partition所有的数据先保存到list当中去,然后我们在插入MySQL的时候就可以结合pstmt的批处理,一次过把整个分区数据写进去

    • 复用已有的数据:
      在项目中,如果同时实现多个功能,在计算时观察每个功能间是否有重叠产生的数据,若有的话把相应的数据提取出来生成,所有的功能实现都能共用(相当于做一个缓存,把中间数据cache )

    • 参数的优化:
      并行度:spark.sql.shuffle.partitions
      默认的是200,配置的是partitions的数量,对应了task的数量
      若觉得运行得太慢,则需要吧这个值调大
      在conf里面改(YARN启动时)

    • 关闭分区字段类型推测
      默认为开启,若开启之后系统就会自动推测分区字段的类型
      关闭后能提升性能

    相关文章

      网友评论

        本文标题:Hive平滑过渡到Spark Sql

        本文链接:https://www.haomeiwen.com/subject/ioeenqtx.html