美文网首页
spark SQL 1.基本操作

spark SQL 1.基本操作

作者: caster | 来源:发表于2021-06-07 09:27 被阅读0次

    1. 进化史

    Spark SQL用于结构化数据处理。
    Hive:SQL简化了MR操作(on HDFS) 。
    Shark:基于Hive开发,更改了MR引擎,提升了SQL on HDFS的性能,即Hive on Spark。
    Spark:移除Hive,独立为Spark SQL,封装RDD为DF,DS。

    2. 优势:

    整合SQL和spark编程,简化RDD编程;
    统一方式连接数据源;
    兼容Hive HQL;
    支持JDBC/ODBC;

    3. DataFrame/DataSet < --RDD

    DataFrame:以RDD为基础的分布式数据集,带有schema元信息。懒执行,性能优于RDD,底层会进行优化操作。
    DataSet:相比DataFrame,提供强类型,方便操作数据。
    Spark1.0 => RDD
    Spark1.3 => DataFrame
    Spark1.6 => DataSet(后续主推)
    DataFrame=DataSet[Row]

    4. DataFrame构造

    SparkSession创建和执行DF和SQL,创建DF方式:

    • 读数据源
    • 从一个RDD转换
    • 从Hive Table查询

    读文件中数字类型数据默认为bigint≈Long,内存中获取数字类型数据默认为Int
    将DF创建为视图用于查询:
    df.createTempView():适用于于当前session且view名不可替换。
    df.createOrReplaceTempView():适用于于当前session且可替换。
    df.createGlobalTempView():适用于全局session,查询时view前加global_temp.

    5. DataFrame的DSL

    使用特定语法DSL,不需要创建view进行查询。

    val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
    //隐式转换依赖
    import spark.implicits._
    val df1 = spark.read.json("file/1.txt")
    //输出df结果
    df1.printSchema()
    //调用RDD函数函数
    df1.map(e => (e.getString(1),e.getLong(0)+ 1)).show()
    df1.select("user","age").show()
    //需要计算的话:每个列都需要使用引用,两种方式:
    df1.select($"user",$"age"+1).show()
    df1.select('user,'age+1).show()
    //查询年龄大于10的
    df1.filter('age>10).show()
    //分组聚合
    df1.groupBy("age").count().show()
    spark.stop()
    

    6. RDD和DataFrame转换

    ∵ RDD只关心数据,DF同时关心结构。
    ∴ RDD+schema=DF

    val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
    //需要引入转换用包
    import spark.implicits._
    //rdd转df
    val rdd = spark.sparkContext.makeRDD(List(1, 2, 3, 4, 5))
    //val df = rdd.toDF()//默认为value
    val df = rdd.toDF("id")
    df.show()
    //df转rdd
    val rdd1 = df.rdd
    rdd1.foreach(println)
    spark.stop()
    

    7. DataSet

    强类型数据集,需要提供类型信息(样例类...)。

    object Test{
    
      case class Person(name:String,age:Long)
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
        import  spark.implicits._
    
        val list=List(Person("a",10),Person("b",11))
        val ds = list.toDS()
        ds.show
        ds.printSchema()
    
        spark.stop()
      }
    
    }
    

    8. DataFrame和DataSet转换

    case class Person(name:String,age:Long)
    def main(args: Array[String]): Unit = {
    
      val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
      import  spark.implicits._
    
      val df = spark.read.json("file/1.txt")
      val ds = df.as[Person]
      ds.show()
      val df1 = ds.toDF()
    }
    

    9. RDD和DataSet转换

    case class Person(name:String,age:Long)
    
    def main(args: Array[String]): Unit = {
    
      val spark = SparkSession.builder.master("local").appName("test").getOrCreate()
      import  spark.implicits._
      val rdd1 = spark.sparkContext.makeRDD(List(Person("a", 10), Person("b", 11)))
      val ds = rdd1.toDS
      ds.rdd
    
      val rdd2 = spark.sparkContext.makeRDD(List(1,2,3))
      rdd2.toDS.show()//默认为value
    }
    

    相关文章

      网友评论

          本文标题:spark SQL 1.基本操作

          本文链接:https://www.haomeiwen.com/subject/dkebjltx.html