spark sql

作者: xncode | 来源:发表于2017-08-14 11:19 被阅读0次

    结构化的数据处理,不像基本的Spark RDD API,提供了更多的结构相关的信息和对应的操作。在内部,spark使用这些相关的信息来做额外的优化。

    进入点:SparkSession

    from pyspark.sql import SparkSession
    
    spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    

    可以从rdd中创建出dataframes或从hive table或从其他datasource

    df = spark.read.json("")  # 读取json文件
    df.printSchema()
    
    df.select("name").show()
    df.select(df['name'], df['age'] + 1).show()
    df.filter(df['age'] > 21).show()
    df.groupBy("age").count().show()
    

    可以直接执行sql语句

    df.createOrReplaceTempView("people")  # 是需要在这种view上执行
    sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    

    之前创建的sql的view是session作用域内的,会在session执行结束时销毁,可以创建全局的temp view,在应用销毁之前都会存在。

    df.createGlobalTempView("people")
    spark.sql("SELECT * FROM global_temp.people").show()
    spark.newSession().sql("SELECT * FROM global_temp.people").show()  # 可直接使用 虽然时之前的session创建的
    

    另外,Scala和java时支持创建dataset的

    转换rdd为dataframe

    在结构可以确定时

    from pyspark.sql import Row
    
    sc = spark.sparkContext
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
    
    schemaPeople = spark.createDataFrame(people)
    schemaPeople.createOrReplaceTempView("people")
    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    

    可以再转换为rdd

    teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
    

    当结构不能事先确定时:

    1、从原始rdd中创建出rdd的列表或元组
    2、创建一个StructType的匹配
    3、通过createDataFrame来适配

    from pyspark.sql.types import *
    sc = spark.sparkContext
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: (p[0], p[1].strip()))
    
    schemaString = "name age"
    
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)
    schemaPeople = spark.createDataFrame(people, schema)
    schemaPeople.createOrReplaceTempView("people")
    results = spark.sql("SELECT name FROM people")
    results.show()
    

    内置的聚合操作:count countDistinct avg max min,对于scala和java还可以自定义聚合操作。

    数据源

    默认的数据源是parquet 也可通过spark.sql.sources.default设置默认的数据源类型

    df = spark.read.load("examples/src/main/resources/users.parquet")
    df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
    

    也可以在装载时制定json, parquet, jdbc, orc, libsvm, csv, text

    df = spark.read.load("examples/src/main/resources/people.json", format="json")
    df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
    

    直接在文件中执行sql

    df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
    

    存储模型

    SaveMode.ErrorIfExists 在存储DataFrame到数据源时,如果数据已经存在了则抛出异常

    SaveMode.Append
    SaveMode.Overwrite
    SaveMode.Ignore

    保存到持久化的表格中

    Hive metastore 通过 saveAsTable

    ParquetFiles

    peopleDF = spark.read.json("examples/src/main/resources/people.json")
    peopleDF.write.parquet("people.parquet")
    parquetFile = spark.read.parquet("people.parquet")
    
    parquetFile.createOrReplaceTempView("parquetFile")
    teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
    teenagers.show()
    

    Hive Tables

    jdbc

    distributed sql engine

    sbin/start-thriftserver.sh
    

    相关文章

      网友评论

          本文标题:spark sql

          本文链接:https://www.haomeiwen.com/subject/tozdrxtx.html