美文网首页
SparkSql之数据的加载与保存

SparkSql之数据的加载与保存

作者: 万事万物 | 来源:发表于2021-08-10 09:26 被阅读0次

    加载数据

    1. 创建SaparkSession
      val sparkSession =SparkSession.builder().master("local[4]").appName("test").getOrCreate()
    
    1. 加载数据方式 * 表示加载的方式
    sparkSession.read.*
    

    format指定加载数据类型

    spark.read.format("…")[.option("…")].load("…")

    format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
    load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据路径
    option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
    spark.read.format("json").load ("input/user.json").show

    示例一:

    sparkSession.read.format("text")
    

    示例二:

    sparkSession.read.format("json")
    

    直接读取数据

    spark.read直接读取数据:csv format jdbc json load option

    options orc parquet schema table text textFile
    注意:加载数据的相关参数需写到上述方法中,
    如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。
    spark.read.json("input/user.json").show()

    示例一:

    sparkSession.read.text()
    

    示例二:

    sparkSession.read.textFile()
    

    读取文本文件

    方式一:format("text")

    val df: DataFrame = sparkSession.read.format("text").load("path")
    

    文本文件无法进行切分,按行读取,若要对一行的数据进行切分,需要可以使用map或其他算子。

    // 导入隐式转换,这一步很重要
    import sparkSession.implicits._
    // 我这里按照`_`分隔,会得到一个`Dataset`
    val value: Dataset[List[String]] = df.map(_.mkString.split("_").toList)
    

    完整代码

      @Test
      def readTextFile(): Unit ={
        // 加载文件,sparkSession 上面有定义
        val df: DataFrame = sparkSession.read.format("text").load("./src/main/resources/data/user_visit_action.txt")
        // 导入隐式转换
        import sparkSession.implicits._
        // 对每行数据进行切分
        val value: Dataset[List[String]] = df.map(_.mkString.split("_").toList)
        // 输出
        value.foreach(println(_))
      }
    

    若还想把文本文件转换成,使用sql来操作;还需要将List 转换成'元组'
    value :上面的 Dataset[List[String]]
    value1 :转换出来的元组
    _1-_13:类名(偷懒,也可以根据该列的定义来命名,如 name, address 等符合规范的命名方式)

        // 转换成元组
        val value1: Dataset[(String, String, String, String, String, String, String, String, String, String, String, String, String)] = value.map(e => {
          e match {
            case List(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13) => (_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13)
          }
        })
    
        //注册成表
        value1.createOrReplaceTempView("log")
    
        // 编写sql
        sparkSession.sql("select * from log").show()
    

    方式二:read.text("path")

      @Test
      def readTextFile(): Unit ={
        // 加载文件,sparkSession 上面有定义
        val df: DataFrame = sparkSession.read.text("./src/main/resources/data/user_visit_action.txt")
        // 导入隐式转换
        import sparkSession.implicits._
        // 对每行数据进行切分
        val value: Dataset[List[String]] = df.map(_.mkString.split("_").toList)
    
        // 转换成元组
        val value1: Dataset[(String, String, String, String, String, String, String, String, String, String, String, String, String)] = value.map(e => {
          e match {
            case List(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13) => (_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13)
          }
        })
    
        //注册成表
        value1.createOrReplaceTempView("log")
    
        // 编写sql
        sparkSession.sql("select * from log").show()
    
      }
    

    结果就不展示了,(真是数据)

    text 与 textFile 的区别

    1. 都是用于读取文本文件
    2. 返回的类型不同
      text 返回的是DataFrame
    val df: DataFrame = sparkSession.read.text("./src/main/resources/data/user_visit_action.txt")
    

        textFile 返回的是Dataset

    val df: Dataset[String] = sparkSession.read.textFile("./src/main/resources/data/user_visit_action.txt")
    

    读取json文件

    注意:读取json文件读取其实json行文件。
    如像这种

    {"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
    {"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
    {"id":2,"name":"绣花","age":16,"sex":"女","class_id":1}
    {"id":3,"name":"李四","age":18,"sex":"男","class_id":1}
    {"id":4,"name":"王五","age":18,"sex":"男","class_id":1}
    {"id":5,"name":"翠花","age":19,"sex":"女","class_id":1}
    {"id":6,"name":"张鹏","age":17,"sex":"男","class_id":1}
    

    方式一:format("json")

      @Test
      def readJSONFile(): Unit ={
        val df: DataFrame = sparkSession.read.format("json").load("C:\\Users\\123456\\Desktop\\student.json")
    
        df.foreach(e=>{
          println(e)
        })
    
      }
    

    结果

    [18,1,1,张三,男]
    [18,1,1,张三,男]
    [16,1,2,绣花,女]
    [18,1,3,李四,男]
    [18,1,4,王五,男]
    [19,1,5,翠花,女]
    [17,1,6,张鹏,男]
    

    方式二:read.json("path")

      @Test
      def readJSONFile(): Unit ={
        val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
    
        df.foreach(e=>{
          println(e)
        })
    
      }
    

    若要使用sql方式来操作需要转换成

      @Test
      def readJSONFile(): Unit ={
        val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
    
        df.createOrReplaceTempView("json")
    
        sparkSession.sql("select * from json").show()
      }
    
    +---+--------+---+----+---+
    |age|class_id| id|name|sex|
    +---+--------+---+----+---+
    | 18|       1|  1|张三| 男|
    | 18|       1|  1|张三| 男|
    | 16|       1|  2|绣花| 女|
    | 18|       1|  3|李四| 男|
    | 18|       1|  4|王五| 男|
    | 19|       1|  5|翠花| 女|
    | 17|       1|  6|张鹏| 男|
    +---+--------+---+----+---+
    

    统计男女人数

      @Test
      def readJSONFile(): Unit ={
        val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
        // 注册成表
        df.createOrReplaceTempView("json")
        // 编写sql
        sparkSession.sql("select sex,count(1) from json group by sex").show()
      }
    
    +---+--------+
    |sex|count(1)|
    +---+--------+
    | 男|       5|
    | 女|       2|
    +---+--------+
    

    text相比,无需指定列字段,也不用导入隐式转换。


    读取JDBC数据

    导入mysql依赖包

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.25</version>
    </dependency>
    

    方式一:format("jdbc")
    连接数据库,不能像加载文件的方式使用load("path")的形式了,而是需要使用option()的方式。
    option() 采用k-v的形式进行配置

      @Test
      def readJDBC(): Unit ={
        val reader: DataFrameReader = sparkSession.read.format("jdbc")
        // url
        reader.option(JDBCOptions.JDBC_URL, "jdbc:mysql://hadoop102:3306/gmall")
        // 指定表
        reader.option(JDBCOptions.JDBC_TABLE_NAME,"user_info")
        // 账号
        reader.option("user","root")
        // 密码
        reader.option("password","123321")
    
        // 连接
        val df: DataFrame = reader.load()
    
        df.show()
      }
    

    位于org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions 项目中。

    object JDBCOptions {
      private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
      private val jdbcOptionNames = collection.mutable.Set[String]()
    
      private def newOption(name: String): String = {
        jdbcOptionNames += name.toLowerCase(Locale.ROOT)
        name
      }
    
      val JDBC_URL = newOption("url")
      val JDBC_TABLE_NAME = newOption("dbtable")
      val JDBC_QUERY_STRING = newOption("query")
      val JDBC_DRIVER_CLASS = newOption("driver")
      val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
      val JDBC_LOWER_BOUND = newOption("lowerBound")
      val JDBC_UPPER_BOUND = newOption("upperBound")
      val JDBC_NUM_PARTITIONS = newOption("numPartitions")
      val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
      val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
      val JDBC_TRUNCATE = newOption("truncate")
      val JDBC_CASCADE_TRUNCATE = newOption("cascadeTruncate")
      val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
      val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
      val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
      val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
      val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
      val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
      val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
    }
    

    此种方式一直有一个分区,不适用数据量大的表

    println(df.rdd.getNumPartitions) // 1
    

    方式二:read.jdbc()

    jdbc()有三个重载方法

    • def jdbc(url: String, table: String, properties: Properties)
    • def jdbc(url: String,table: String,predicates: Array[String],connectionProperties: Properties)
    • def jdbc(url: String,table: String, columnName: String,lowerBound: Long,upperBound: Long,numPartitions: Int,connectionProperties: Properties)

    url:jdbc连接地址
    table:指定表名
    predicates:每个分区的 where 子句中的条件。
    connectionPropertiesproperties:java.util.Properties;通常配置一些账号密码等连接信息

    方式一:此种方式读取mysql数据的时候只有一个分区 <TODO 只适用于数据量特别小的时候>

      @Test
      def readJDBC(): Unit ={
    
        // url
        val url="jdbc:mysql://hadoop102:3306/gmall"
        // 表名
        val table_name="user_info"
        // 配置
        val prop=new Properties()
        prop.put("user","root")
        prop.put("password","123321")
    
        val reader: DataFrame= sparkSession.read.jdbc(url,table_name,prop)
    
        reader.show()
        // 分区数
        println(reader.rdd.getNumPartitions) // 结果为 1
      }
    

    查询结果

    +---+------------+---------+------+--------+-----------+--------------------+--------+----------+----------+------+-------------------+------------+------+
    | id|  login_name|nick_name|passwd|    name|  phone_num|               email|head_img|user_level|  birthday|gender|        create_time|operate_time|status|
    +---+------------+---------+------+--------+-----------+--------------------+--------+----------+----------+------+-------------------+------------+------+
    |  1|     9i5x8jm|     阿河|  null|    孟河|13414217211|    9i5x8jm@0355.net|    null|         1|1967-06-14|     M|2020-06-14 16:38:32|        null|  null|
    |  2|613oemp403w5|     岚艺|  null|令狐岚艺|13617884815|613oemp403w5@263.net|    null|         1|1982-06-14|     F|2020-06-14 16:38:32|        null|  null|
    |  3|     vxipycm|     羽羽|  null|    邬羽|13348276946|    vxipycm@yeah.net|    null|         1|1975-06-14|     F|2020-06-14 16:38:32|        null|  null|
    |  4|    tvl37zh3|     昭昭|  null|  公孙昭|13872281627|    tvl37zh3@263.net|    null|         2|1992-06-14|     F|2020-06-14 16:38:32|        null|  null|
    |  5|  hlh52rbbot|     超浩|  null|  许超浩|13645559786| hlh52rbbot@yeah.net|    null|         1|1971-06-14|     M|2020-06-14 16:38:32|        null|  null|
    |  6|      k7ydt3|     馥馥|  null|    茅馥|13621194624|       k7ydt3@qq.com|    null|         1|2005-06-14|     F|2020-06-14 16:38:32|        null|  null|
    |  7|   w748bj1wb|     卿聪|  null|  汤卿聪|13132783352|  w748bj1wb@sohu.com|    null|         1|1976-06-14|     F|2020-06-14 16:38:32|        null|  null|
    |  8| sn5bybujp9h|     阿固|  null|    钱固|13842783258| sn5bybujp9h@263.net|    null|         1|2003-06-14|     M|2020-06-14 16:38:32|        null|  null|
    |  9|  s5iamgaydk|     云莲|  null|  汪云莲|13268453518|  s5iamgaydk@ask.com|    null|         3|1994-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 10|  ad5for2z27|     艺艺|  null|  司徒艺|13513942543|  ad5for2z27@163.net|    null|         1|1983-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 11|      zqvdkg|     璐娅|  null|宇文璐娅|13226187467|      zqvdkg@ask.com|    null|         1|2000-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 12|     8y4v4em|     聪聪|  null|    安聪|13882977875|     8y4v4em@163.net|    null|         1|1987-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 13|      kj47k5|     晓晓|  null|    祁晓|13984996852|     kj47k5@live.com|    null|         1|1990-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 14|  49ungc4vxw|     昌成|  null|  方昌成|13136526735|49ungc4vxw@hotmai...|    null|         3|1987-06-14|     M|2020-06-14 16:38:32|        null|  null|
    | 15| w71ygayab8h|     荣爱|  null|  孙荣爱|13161527545|w71ygayab8h@0355.net|    null|         2|1977-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 16|   ttyeksqza|     卿卿|  null|  公孙卿|13221949889|   ttyeksqza@163.net|    null|         2|1966-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 17|      558vfl|     华慧|  null|  陶华慧|13825216547|      558vfl@163.com|    null|         2|1992-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 18|      ws81it|     坚和|  null|  余坚和|13741734718| ws81it@yahoo.com.cn|    null|         1|2003-06-14|     M|2020-06-14 16:38:32|        null|  null|
    | 19|    hpsvztha|     希希|  null|    元希|13172412153|   hpsvztha@sohu.com|    null|         1|2005-06-14|     F|2020-06-14 16:38:32|        null|  null|
    | 20|i0jht7qvv1t9|     茗茗|  null|    安茗|13842784759|i0jht7qvv1t9@0355...|    null|         2|1977-06-14|     F|2020-06-14 16:38:32|        null|  null|
    +---+------------+---------+------+--------+-----------+--------------------+--------+----------+----------+------+-------------------+------------+------+
    only showing top 20 rows
    

    第一种方式和使用format("jdbc")一样,只有一个分区。


    方式二:此种方式,读取mysql数据的时候分区数 = predicates元素个数 <TODO 一般不用>

      @Test
      def readJDBC(): Unit ={
    
        // url
        val url="jdbc:mysql://hadoop102:3306/gmall"
        // 表名
        val table_name="user_info"
    
        // 配置
        val prop=new Properties()
        prop.put("user","root")
        prop.put("password","123321")
    
        // 指定每个分区的条件
        val predicates=Array(
          "LENGTH(name)=2", //将name长度为2的数据化为一个分区
          "email like '%.com'", //将email 以.com结尾的划分为一个分区
          "gender='M'" // 将gender为M的化为一个分区
        )
    
    
        val reader: DataFrame= sparkSession.read.jdbc(url,table_name,predicates,prop)
    
        reader.show()
    
        println(reader.rdd.getNumPartitions) // 结果为 3
    
      }
    

    这种方式虽然可以指定分区,但是需要通过where条件来指定分区,如数据量比较大,需要分100个分区,难道要写100个where条件吗?所以一般清空下不用。数据量小的清空下用方式一,数据量大的情况下用方式三

    方式三:此种方式,读取mysql数据的分区数 = (upperBound-lowerBound) > numPartitions? upperBound-lowerBound : numPartitions
    columnName:指定按照什么方式进行分区;字段类型必须为数字日期时间戳类型列的名称。
    lowerBound:用于决定分区步幅的 columnName 的最小值。
    upperBound:用于决定分区步幅的 columnName 的最大值。
    numPartitions: 分区数。这与“lowerBound”(包含)、“upperBound”(不包含)一起形成分区步幅,用于生成用于均匀拆分列“columnName”的 WHERE 子句表达式。当输入小于 1 时,数字设置为 1。

      @Test
      def readJDBC(): Unit ={
    
        // url
        val url="jdbc:mysql://hadoop102:3306/gmall"
        // 表名
        val table_name="user_info"
    
        // 分区字段
        val columnName="id"
        // 分区步幅的最小值
        val lowerBound=1
        // 分区步幅的最大值
        val upperBound=10
        // 分区数
        val numPartitions=99
    
        // 配置
        val prop=new Properties()
        prop.put("user","root")
        prop.put("password","123321")
    
        val reader: DataFrame= sparkSession.read.jdbc(url,table_name,columnName,lowerBound,upperBound,numPartitions,prop)
    
        reader.show()
    
        println(reader.rdd.getNumPartitions)
    
      }
    
    分区数为9
    

    为什么分区数是9呢?是怎么的来了,通过公式计算
    (upperBound-lowerBound) >= numPartitions? numPartitions : upperBound-lowerBound=(10-1)>=99?99:10-1
    计算步长
    val stride: Long = upperBound / numPartitions - lowerBound / numPartitions=10/99-1/99=0.09090909090909091 (取整为0)

    保存数据

    语法:

    df.write
    .mode(SaveMode.XX) // 数据写入模式
    .写入格式(csv,json,jdbc)

    SaveMode模式:

    • Append: 追加数据<TODO 常用,一般数据写入mysql会用>
    • Overwrite: 如果目录存在则覆盖数据<TODO 常用,一般用于数据写入HDFS>
    • ErrorIfExists: 如果目录已经存在则报错
    • Ignore: 如果目录存在则不写入

    示例:将数据写入成格式

    {"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
    {"id":1,"name":"张三","age":18,"sex":"男","class_id":1}
    {"id":2,"name":"绣花","age":16,"sex":"女","class_id":1}
    {"id":3,"name":"李四","age":18,"sex":"男","class_id":1}
    {"id":4,"name":"王五","age":18,"sex":"男","class_id":1}
    {"id":5,"name":"翠花","age":19,"sex":"女","class_id":1}
    {"id":6,"name":"张鹏","age":17,"sex":"男","class_id":1}
    

    案例:读取json文件,将性别为男的信息写入到csv格式文件中

      @Test
      def jsonWriteCsv(): Unit ={
        val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
    
        //注册成表
        df.createOrReplaceTempView("json")
    
        //编写sql 查询所有性别为男的数据
        val frame: DataFrame = sparkSession.sql(
          """
            |select * from json where sex='男'
            |""".stripMargin)
    
        // 将结果写入到csv格式文件中
        frame.write.mode(SaveMode.Overwrite).csv("output/10001.csv")
        
      }
    

    文件结果

    18,1,1,张三,男
    18,1,1,张三,男
    18,1,3,李四,男
    18,1,4,王五,男
    17,1,6,张鹏,男
    

    默认分隔符号为,号,也可以指定其他分隔符号

    使用sep指定分隔形式

    frame.write.mode(SaveMode.Overwrite).option("sep","\t").csv("output/10001.csv")
    
    18  1   1   张三  男
    18  1   1   张三  男
    18  1   3   李四  男
    18  1   4   王五  男
    17  1   6   张鹏  男
    

    除了保存到文件中,也可以保存到数据库

    @Test
      def jsonWriteMysql(): Unit ={
        // 读取json文件
        val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
    
    
        // url
        val url="jdbc:mysql://hadoop102:3306/gmall"
        // 表名
        val table_name="json_to_student"
        // 配置
        val prop=new Properties()
        prop.put("user","root")
        prop.put("password","123321")
    
    
        //将数据写入到mysql数据库中
        df.write.mode(SaveMode.Overwrite).jdbc(url,table_name,prop)
    
      }
    

    使用mysql客户端,查询json_to_student

    select * from json_to_student;
    

    注意:

    1. 写入到jdbc中中文数据乱码
      解决方案,url后面加?useUnicode=true&characterEncoding=UTF-8
    val url="jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=UTF-8"
    
    1. 数据写入mysql采用追加的形式,有可能出现主键冲突,解决方式使用foreachPartitions
    df.rdd.foreachPartition()
    

    案例演示

     @Test
      def jsonWriteMysql2(): Unit ={
        // 读取json文件
        val df: DataFrame = sparkSession.read.json("C:\\Users\\123456\\Desktop\\student.json")
    
        //将数据写入到mysql数据库中
        //df.write.mode(SaveMode.Append).jdbc(url,table_name,prop)
    
        df.rdd.foreachPartition(it=>{
    
          // 注册驱动
          Class.forName("com.mysql.jdbc.Driver")
    
          // url
          val url="jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=UTF-8"
          // 账号
          val username="root"
          // m密码
          val password="123321"
    
          // 获取连接
          import java.sql.DriverManager
          val conn = DriverManager.getConnection(url, username, password)
    
          // 编写sql
          val sql=
            """
              |insert into json_to_student(age,class_id,id,name,sex) values(?,?,?,?,?) ON DUPLICATE KEY UPDATE
              |age=values(age),class_id=values(class_id),name=values(name),sex=values(sex), id=values(id)
              |""".stripMargin
    
          val statement: PreparedStatement = conn.prepareStatement(sql)
          it.foreach(e=>{
            val age=e(0).toString.toInt
            val class_id=e(1).toString.toInt
            val id=e(2).toString.toInt
            val name=e(3).toString
            val sex=e(4).toString
    
            statement.setInt(1,age)
            statement.setInt(2,class_id)
            statement.setInt(3,id)
            statement.setString(4,name)
            statement.setString(5,sex)
    
            //添加到批处理中
            statement.addBatch()
    
          })
          // 执行批处理
          statement.executeBatch()
    
          // 关闭资源
          statement.close()
          conn.close()
    
        })
    
      }
    

    注意:

    1. 必须指定主键
    “ALTER TABLE 数据表名 ADD PRIMARY KEY(字段名/列名);”
    
    1. 使用 ON DUPLICATE KEY UPDATE
      可以完成主键存在则修改,不存在则插入功能。

    与Hive 交互

    shell命令的方式

    在spark中默认有一个hive

    1. 启动Hadoop
      需要先进行Hadoop与spark配置
    2. 进入 spark 的bin/spark-shell终端中。
    spark-yarn]$ bin/spark-shell
    
    1. 直接执行sql
      创建表(ddl不用加show)
    scala> spark.sql("create table student2 (id int,name string,age int, sex string)")
    

    插入一条数据(必须加show)

    scala> spark.sql("insert into student2 values(1,'tom',18,'M')").show
    

    查询表

    scala> spark.sql("select * from student2").show
    +---+----+---+---+
    | id|name|age|sex|
    +---+----+---+---+
    |  1| tom| 18|  M|
    +---+----+---+---+
    

    此时 spark 根目录就有两个文件(自带的hive 数据会存在这两张表中)

    • metastore_db:存放元数据信息
    • spark-warehouse:存放 hive 表数据

    使用的自带的hive 默认使用的是derby数据库
    derby有个特点:不支持同时开启两个./hive的命令终端,普遍操作,将hive数据库更换成mysql数据库或其他。

    若要使用自己的hive 需要将hive-site.xml 复制 spark 的conf目录下

     spark-yarn]$ cp /opt/module/hive/conf/hive-site.xml ./conf/
    

    除此之外还需要 mysql的驱动包,拷贝到 spark的jars目录下。

     spark-yarn]$ cp /opt/software/mysql-connector-java-5.1.27-bin.jar ./jars/
    

    重新进入spark-shell 中,这样就可以使用我们安装的hive了。

    spark-yarn]$ bin/spark-shell
    

    idea方式
    实际开发中,我们肯定通过代码的方式去操作hive,所以我们需要将hive整合到项目中。

    1. 第一步拷贝 hive-site.xmlresources
    2. 创建SparkSession时开启Hive支持(.enableHiveSupport())
     // 创建 SparkSession 时 需要开启hive支持
      val sparkSession =SparkSession.builder().master("local[4]").appName("test").enableHiveSupport().getOrCreate()
    
    1. 导入hive 依赖
    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
    

    否则会出现如下错误

    java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found.
    
    1. 编写sql
      @Test
      def demo01: Unit ={
        sparkSession.sql("select * from t_person").show()
      }
    
    1. 若遇到权限问题,解决方式
      在 VM options 中配置 -DHADOOP_USER_NAME=账号名称


      image.png

    相关文章

      网友评论

          本文标题:SparkSql之数据的加载与保存

          本文链接:https://www.haomeiwen.com/subject/iyuzultx.html