美文网首页
SparkSQL温习笔记-1

SparkSQL温习笔记-1

作者: Hive_何伟 | 来源:发表于2018-03-14 21:16 被阅读17次

     一、介绍

       Shark是SparkSQL(其完全脱离了 Hive 的限制)的前身,Shark的性能比 MapReduce 的 Hive 普遍快 2 倍以上,当数据全部 load 在内存的话,将快 10 倍以上,因此 Shark 可以作为交互式查询应用服务来使用。

        SparkSQL具有很多特性(官网):

        1.Integrated 

        Seamlessly mix SQL and Spark programs,Apply functions to results of SQL queries.我的理解是SQL能和RDD完美结合使用,可视为RDD一样去操作,各种算子可以直接使用。

        2.Uniform Data Access -- Connect to any data source the same way.

        整合各种数据比较方便,读取json的数据注册成表,读取hive的表,两张表可以直接join,其实底层都是转换成RDD去操作。

         DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.

        常见的是数据源Hive(ETL后的数据),JSON,JDBC(mysql等存放分析参数),还可以读取hdfs,S3(Amazon),Postgresql(gp、hawq直接读取表?还是先放在hdfs中,有空调研下)。

        3.Hive Integration 和hive兼容性很好,很好配置就可以读取hive metaStore

        Spark core的算子拼接的操作,可以用Sparksql去实现。。有时候会sql实现会更简单一些。

        DataFrame存储是列式存储,不需要的字段不加载到内存中,这样查询、聚合速度快。

    二、创建 DataFrame

        1.动态创建 schema (scala/python)

    --scala

        val sc = new SparkContext(conf)

        val sqlContext = new SQLContext(sc)

        val people = sc.textFile("scores.txt")

        val schemaString = "cla:String sc:Integer"

        //如果schema中制定了除String以外别的类型  在构建rowRDD的时候要注意指定类型    例如: p(2).toInt

        val rowRDD = people.map(_.split("\t")).map(p => Row(p(0), p(1).toInt))

        val schema =

          StructType(schemaString.split(" ").map(fieldName => StructField(fieldName.split(":")(0), if (fieldName.split(":")(1).equals("String")) StringType else IntegerType, true)))

    //    val structFields = Array(StructField("clazz",StringType,true),StructField("score",IntegerType))

    //    val schema = StructType(structFields)

        //  val arr = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))

        //  val schema = StructType.apply(arr)

        val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

        peopleDataFrame.printSchema()

        peopleDataFrame.show()

    --python

    sqlContext = SQLContext(sc)

    #读取hdfs

    data = sc.textFile("/user/nixm/czrk_data.txt")

    #切分字符串

    data_noheader = data.map(lambda x:x.split(";"))

    #创建schema

    schema = StructType([StructField("hh",StringType(),True),

    StructField("sfzjhm",StringType(),True),

    StructField("yhzgx",StringType(),True)

    ])

    #创建DF

    df = sqlContext.createDataFrame(data_noheader,schema)

    #DF注册成临时表

    df.registerTempTable('test')

    #执行sql(返回DF)并持久化

    df_dis = sqlContext.sql(sqlQuery="select hh,sfzjhm from test where yhzgx='户主' group by hh,sfzjhm").cache()

    #将读取的结果(DF)注册成临时表

    df_dis.registerTempTable('df_dis')

    #执行sql并持久化

    df_all= sqlContext.sql(sqlQuery="select * from test where yhzgx!='户主'").cache()

    #注册成临时表

    df_all.registerTempTable('df_all')

    #表与表的join

    result=sqlContext.sql(sqlQuery="select a.sfzjhm as a_sfzjhm, a.yhzgx, b.sfzjhm as b_sfzjhm from df_all a inner join df_dis b on a.hh=b.hh ").cache()

    #对dataFrame使用map算子后,返回类型是RDD

    hdfsRDD=result.rdd.map(lambda p:p. a_sfzjhm+";"+p.yhzgx+";"+p.b_sfzjhm)

    #重新分区,将DF最终存放到hdfs

    hdfsRDD.repartition(1).saveAsTextFile("/user/nixm/jlout/czrk.txt")

        2.通过反射将RDD转换成DF[比较死板]

    peoples.txt     1,lucy,18    /n    2,jim,11

    主要代码

    case class Person(name:String, age: Int)

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf()//创建sparkConf对象

    conf.setAppName("Spark App")//设置应用程序的名称

    conf.setMaster("local")

    val sqlContext =new SQLContext(sc)

    import sqlContext.implicits._ //隐式转换

    //使用反射方法将RDD转换成DF

    val people = sc.textFile("peoples.txt").map(_.split(",")).map(p =>Person(p(1), p(2).trim.toInt)).toDF()

    people.registerTempTable("people") //注册成临时表

    //执行sql查询

    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 6 AND age <= 19")

    //对dataFrame使用map算子后,返回类型是RDD

    teenagers.map(t =>"Name: " + t(0)).foreach(println)

    // or by field name:

    teenagers.map(t =>"Name: " + t.getAs[String]("name")).foreach(println)

    相关文章

      网友评论

          本文标题:SparkSQL温习笔记-1

          本文链接:https://www.haomeiwen.com/subject/oblqqftx.html