美文网首页
4.Spark基础学习四(IDEA创建Spark_SQL)

4.Spark基础学习四(IDEA创建Spark_SQL)

作者: 做个合格的大厂程序员 | 来源:发表于2020-09-03 19:28 被阅读0次

    IDEA创建SparkSQL程序

    IDEA中程序的打包和运行方式都和SparkCore类似,Maven依赖中需要添加新的依赖项:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    
    package com.atguigu.sparksql
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.{SparkConf, SparkContext}
    import org.slf4j.LoggerFactory
    
    object HelloWorld {
    
      def main(args: Array[String]) {
        //创建SparkConf()并设置App名称
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          .config("spark.some.config.option", "some-value")
          .getOrCreate()
    
        // For implicit conversions like converting RDDs to DataFrames
        //隐士转换
        import spark.implicits._
    
        val df = spark.read.json("data/people.json")
    
        // Displays the content of the DataFrame to stdout
        df.show()
    
        df.filter($"age" > 21).show()
    
        df.createOrReplaceTempView("persons")
    
        spark.sql("SELECT * FROM persons where age > 21").show()
    
        spark.stop()
      }
    
    }
    

    用户自定义函数

    用户自定义UDF函数

    首先先加载一个表

    scala> val df = spark.read.json("examples/src/main/resources/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.show()
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    

    自定义添加UDF函数,就是在名字前面加上Name

    scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
    res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
    
    scala> df.createOrReplaceTempView("people")
    
    scala> spark.sql("Select addName(name), age from people").show()
    +-----------------+----+
    |UDF:addName(name)| age|
    +-----------------+----+
    |     Name:Michael|null|
    |        Name:Andy|  30|
    |      Name:Justin|  19|
    +-----------------+----+
    

    用户自定义聚合函数

    ​ 强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。

    ​ 弱类型用户自定义聚合函数:通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数。

    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    
    object MyAverage extends UserDefinedAggregateFunction {
        // 聚合函数输入参数的数据类型 
        def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
        // 聚合缓冲区中值得数据类型
        def bufferSchema: StructType = {
            StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
         }
        
        // 返回值的数据类型 
        def dataType: DataType = DoubleType
        // 对于相同的输入是否一直返回相同的输出。
        def deterministic: Boolean = true
        // 初始化
        def initialize(buffer: MutableAggregationBuffer): Unit = {
            // 存工资的总额
            buffer(0) = 0L
            // 存工资的个数
            buffer(1) = 0L
         }
        
    // 相同Execute间的数据合并。 
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
            if (!input.isNullAt(0)) {
            buffer(0) = buffer.getLong(0) + input.getLong(0)
            buffer(1) = buffer.getLong(1) + 1
       }
     }
    // 不同Execute间的数据合并 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
       }
        
    // 计算最终结果
    def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
    }
    

    在spark中调用

    // 注册函数
    spark.udf.register("myAverage", MyAverage)
    
    val df = spark.read.json("examples/src/main/resources/employees.json")
    df.createOrReplaceTempView("employees")
    df.show()
    // +-------+------+
    // |   name|salary|
    // +-------+------+
    // |Michael|  3000|
    // |   Andy|  4500|
    // | Justin|  3500|
    // |  Berta|  4000|
    // +-------+------+
    
    val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
    result.show()
    // +--------------+
    // |average_salary|
    // +--------------+
    // |        3750.0|
    // +--------------+
    

    加载保存的方法

    JSON文件

    如果要让Spark加载Json文件,那么Json文件必须符合每一行都是一个json而不是像平常Json那样多行为一个Json,这点必须要注意。例如

    {"name":"Michael"}
    {"name":"Andy", "age":30}
    {"name":"Justin", "age":19}
    

    读取Json时,需要隐式导入

    // Primitive types (Int, String, etc) and Product types (case classes) encoders are
    // supported by importing this when creating a Dataset.
    import spark.implicits._
    
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files
    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)
    
    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)
    
    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")
    
    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+
    
    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a Dataset[String] storing one JSON object per string
    val otherPeopleDataset = spark.createDataset(
    """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    

    Parquet文件

    ​ Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL的全部数据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法。

    importing spark.implicits._
    import spark.implicits._
    
    val peopleDF = spark.read.json("examples/src/main/resources/people.json")
    
    peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")
    
    val parquetFileDF = spark.read.parquet("hdfs:// hadoop102:9000/people.parquet")
    
    parquetFileDF.createOrReplaceTempView("parquetFile")
    
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    

    JDBC

    ​ Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

    注意:*需要将相关的数据库驱动放到spark的类路径下*

    从Mysql数据库加载数据方式一

    val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://hadoop102:3306/rdd")
    .option("dbtable", "rddtable")
    .option("user", "root")
    .option("password", "000000")
    .load()
    
    jdbcDF2.write
    .jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)
    

    从Mysql数据库加载数据方式二

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "000000")
    val jdbcDF2 = spark.read
    .jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)
    

    将数据写入Mysql方式一

    jdbcDF.write
    .format("jdbc")
    .option("url", "jdbc:mysql://hadoop102:3306/rdd")
    .option("dbtable", "dftable")
    .option("user", "root")
    .option("password", "000000")
    .save()
    

    将数据写入Mysql方式二

    jdbcDF2.write
    .jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)
    

    Spark 和 Hive

    ​ Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。

    ​ 若要把Spark SQL连接到一个部署好的Hive上,你必须把hive-site.xml复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

    Hive加载

    想连接外部已经部署好的Hive,需要通过以下几个步骤。

    1. 将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下

    2. 打开spark shell,注意带上访问Hive元数据库的JDBC客户端

      $ bin/spark-shell  --jars mysql-connector-java-5.1.27-bin.jar
      
    3. Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI

      ./bin/spark-sql
      

    代码中使用Hive

    添加依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>1.2.1</version>
    </dependency>
    

    创建SparkSession时需要添加hive支持

    val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath
    
    val spark = SparkSession
    .builder()
    .appName("Spark Hive Example")
    .config("spark.sql.warehouse.dir", warehouseLocation)
    .enableHiveSupport()
    .getOrCreate()
    

    相关文章

      网友评论

          本文标题:4.Spark基础学习四(IDEA创建Spark_SQL)

          本文链接:https://www.haomeiwen.com/subject/osvosktx.html