美文网首页Spark
Learning Spark [4] - Spark SQL

Learning Spark [4] - Spark SQL

作者: 屹然1ran | 来源:发表于2021-01-25 14:17 被阅读0次

    Spark SQL为Spark提供了以下几个特性:

    • 提供高级结构框架的API(见Learning Spark [3])
    • 允许读取不同格式的数据(json, hive, tables, csv, parquet, avro,orc)
    • 允许使用BI工具(Power BI, Tableau)的JDBC/ODBC链接器查询数据,以及使用RDBMSs(MySQL, PostgreSQL)
    • 提供与储存在Spark Application的数据交互的语法接口
    • 提供一个交互式Shell去执行SQL查询
    • 支持HiveQL


      Spark SQL连接器 & 数据源

    例子:基础查询

    # Basic Query Example
    import os
    from pyspark.sql import SparkSession
    from pyspark import SparkFiles
    os.chdir('D:/Users/fyrli/Desktop/R work/learning spark/chapter 4')
    spark = (SparkSession
             .builder
             .appName('SparkSQLExampleApp')
             .getOrCreate())
    # path to data set
    departure_delays = 'departuredelays.csv'
    # Read & Create a Temp View
    df = spark.read.csv(departure_delays, header = True, inferSchema = True)
    df.createOrReplaceTempView('us_delay_flights_tbl')
    df.show(n = 5)
    
    +-------+-----+--------+------+-----------+
    |   date|delay|distance|origin|destination|
    +-------+-----+--------+------+-----------+
    |1011245|    6|     602|   ABE|        ATL|
    |1020600|   -8|     369|   ABE|        DTW|
    |1021245|   -2|     602|   ABE|        ATL|
    |1020605|   -4|     602|   ABE|        ATL|
    |1031245|   -4|     602|   ABE|        ATL|
    +-------+-----+--------+------+-----------+
    

    这个数据集有五列:

    • date为航班日期,String格式,可以转换为日期格式,例如: 02190925对应02-19 09:25am
    • delay为延误时间,单位为分钟,负数代表提前出发
    • distance为航班的飞行距离
    • origindestination代表起飞和降落机场
      查询distance大于1000英里的航班:
    spark.sql("""
        SELECT date, distance, origin, destination        
        FROM us_delay_flights_tbl
        WHERE distance >= 1000
        ORDER BY distance DESC""").show(10)
    
    +-------+--------+------+-----------+
    |   date|distance|origin|destination|
    +-------+--------+------+-----------+
    |3131530|    4330|   HNL|        JFK|
    |3071625|    4330|   HNL|        JFK|
    |3121530|    4330|   HNL|        JFK|
    |3021625|    4330|   HNL|        JFK|
    |3061625|    4330|   HNL|        JFK|
    |3081530|    4330|   HNL|        JFK|
    |3091530|    4330|   HNL|        JFK|
    |3011625|    4330|   HNL|        JFK|
    |3151530|    4330|   HNL|        JFK|
    |3051625|    4330|   HNL|        JFK|
    +-------+--------+------+-----------+
    only showing top 10 rows
    

    查询所有再SFO和ORD延误了两小时以上的航班

    spark.sql("""
        SELECT date, delay, origin, destination 
        FROM us_delay_flights_tbl 
        WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
        ORDER by delay DESC""").show(10)
    
    +-------+-----+------+-----------+
    |   date|delay|origin|destination|
    +-------+-----+------+-----------+
    |2190925| 1638|   SFO|        ORD|
    |1031755|  396|   SFO|        ORD|
    |1022330|  326|   SFO|        ORD|
    |1051205|  320|   SFO|        ORD|
    |1190925|  297|   SFO|        ORD|
    |2171115|  296|   SFO|        ORD|
    |1071040|  279|   SFO|        ORD|
    |1051550|  274|   SFO|        ORD|
    |3120730|  266|   SFO|        ORD|
    |1261104|  258|   SFO|        ORD|
    +-------+-----+------+-----------+
    only showing top 10 rows
    

    添加一列case when来判断delay的类型

    spark.sql("""
        SELECT delay, origin, destination,              
            CASE WHEN delay > 360 THEN 'Very Long Delays'                  
                 WHEN delay > 120 AND delay < 360 THEN 'Long Delays'                  
                 WHEN delay > 60 AND delay < 120 THEN 'Short Delays'                  
                 WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'                  
                 WHEN delay = 0 THEN 'No Delays'                  
                 ELSE 'Early'               
            END AS Flight_Delays               
        FROM us_delay_flights_tbl               
        ORDER BY origin, delay DESC""").show(10)
    
    +-----+------+-----------+-------------+
    |delay|origin|destination|Flight_Delays|
    +-----+------+-----------+-------------+
    |  333|   ABE|        ATL|  Long Delays|
    |  305|   ABE|        ATL|  Long Delays|
    |  275|   ABE|        ATL|  Long Delays|
    |  257|   ABE|        ATL|  Long Delays|
    |  247|   ABE|        ATL|  Long Delays|
    |  247|   ABE|        DTW|  Long Delays|
    |  219|   ABE|        ORD|  Long Delays|
    |  211|   ABE|        ATL|  Long Delays|
    |  197|   ABE|        DTW|  Long Delays|
    |  192|   ABE|        ORD|  Long Delays|
    +-----+------+-----------+-------------+
    only showing top 10 rows
    

    以下的Python代码类似以上的第一条SQL查询

    (df.select('distance', 'origin', 'destination')
         .where(col('distance') > 1000)
         .orderBy(desc('distance'))).show(10)
    

    建立SQL数据库和表

    默认情况下,Spark会把表建立再default库里。接下来使用美国飞机延误数据,建立一个managed和一个unmanaged table。

    • managed table,Spark会管理数据以及文件储存
    • unmanaged table,Spark仅会管理数据
      首先我们建立一个库,并使用它:
    spark.sql('CREATE DATABASE learn_spark_db')
    spark.sql('USE learn_spark_db')
    

    managed table

    spark.sql(""" 
        CREATE TABLE managed_us_delay_flights_tbl 
        (date STRING, dalay INT, distanct INT, origin STRING, destination STRING)""")
    

    unmanaged table

    spark.sql("""
        CREATE TABLE us_delay_flights_tbl
        (date STRING, delay INT,   distance INT, origin STRING, destination STRING)   
        USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')
    """) 
    

    Reference
    Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

    相关文章

      网友评论

        本文标题:Learning Spark [4] - Spark SQL

        本文链接:https://www.haomeiwen.com/subject/ewsbzktx.html