美文网首页
SparkSQL简介

SparkSQL简介

作者: lukeyan | 来源:发表于2021-03-31 21:25 被阅读0次

    一、 如何运行Spark SQL 查询

    1.1、Spark SQL CLI

    要启动Spark SQL CLI ,请在Spark目录中运行以下内容

    ./bin/spark-sql
    

    可以运行./bin/spark-sql -help来查看所有的可选选项的完整列表

    1.2、Spark的可编程SQL接口

    可以通过SparkSession对象上的sql方法执行sql语句,将返回一个DataFrame,这是一个非常强大的接口,因为有些转换操作通过SQL代码表达要比DataFrame表达简单得多

    spark.sql(
          """
            |select user_id, department, first_name from professors where department in
            |(select name from department where created_date >= '2016-01-01')
            |""".stripMargin)
    
    1.3、SparkSQL Thrift JDBC/ODBC 服务器

    Spark 提供了 一个 Java数据库连接(JDBC)接口,通过它你或远程程序可以连接到Spark驱动器,以便执行SparkSQL查询。
    要启动JDBC/ODBC服务器,请在Spark目录下运行以下内容

    ./sbin/start-thriftserver.sh
    

    此脚本支持全部bin/spark-submit命令行选项
    要查看配置此Thrfit服务器的所有可用选项,请运行./sbin/start-thriftserver.sh --help.
    默认情况下,服务器监听localhost:10000

    可以通过更改环境变量或系统属性来更新该监听地址和端口:
    对于环境变量配置,请使用以下方法:

    export HIVE_SERVER2_THRIFT_PORT=<listening-port>
    export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
    ./sbin/start-thriftserver.sh \
    --master <master-uri>
    

    对于系统属性,可以参考下面:

    ./sbin/start-thriftserver.sh \
    --hiveconf hive.server2.thrift.port=<listening-port> \
    --hiveconf hive.server2.thrift.bind.host=<listening-host> \
    --master <master-uri>
    

    二、 Catalog

    Spark SQL 中最高级别的抽象是CatalogCatalog是一个抽象,用于存储用户数据中的元数据以及其他有用的东西,如数据库,数据表,函数和视图。

    2.1 数据表

    Spark SQL在执行任何操作之前首先需要定义数据表,数据表在逻辑上等同于DataFrame,但核心区别在于 DataFrame是在编程语言范围内定义的,而数表是在数据库定义的

    • 创建表

    Spark允许你从某数据源直接创建表,从文件中读取数据时,你甚至可以指定各种复杂的选项。

    CREATE TABLE flights (
    DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
    USING JSON OPTIONS (path 'data/flight-data/json/2015-summary.json')
    --还可以想表中的某些列添加注释
    CREATE TABLE flights_csv (
    DEST_COUNTRY_NAME STRING,
    ORIGIN_COUNTRY_NAME, STRING COMMENT 'remember ,the US will be most prevalent',
    count LONG)
    USING csv OPTIONS (header true,path '/data/flight-data/csv/2015-summary.csv')
    -- 你也可以从查询创建表
    CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights
    
    • using 和 stored as

    using语法规范具有重要意义,如果没有使用using 指定 格式,则spark将默认为Hive SerDe配置,但是Hive SerDes 比Spark的本机序列化要满得多。Hive用户还可以使用stored as 语法来指定这是一个Hive表。

    • 创建外部表
    CREATE EXTERNAL TABLE hive_flights (
    DEST_COUNTRY_NAME STRING,
    ORIGIN_COUNTRY_NAME STRING,
    count LONG)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '/data/flight-data-hive/'
    -- 还可以从select子句创建外部表
    CREATE EXTERNAL TABLE hive_flights_2
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION '/data/flight-data-hive/' AS SELECT * FROM flights
    
    • 插入表
    INSERT INTO flights_from_select
    SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights LIMIT 20
    -- 插入分区表
    INSERT INTO partitioned_flights PARTITION (DEST_COUNTRY_NAME = "UBITED STATES")
    SELECT count, ORIGIN_COUNTRY_NAME FROM flights
    WHERE DESR_COUNTRY_NAME = 'UNITED STATES' LIMIT 12
    
    • 描述表的元数据
    DESCRIBE TABLE flights
    
    • 刷新表的元数据
    REFRESH TABLE flights
    MSCK REPAIR TABLE flights
    
    • 删除表
    DROP TABLE flights;
    DROP TABLE IF EXISTS flights_csv;
    
    • 缓存表
    CACHE TABLE flights
    UNCACHE TABLE flights
    
    2.2 视图

    在创建了一个表后,就可以定义视图了。定义视图即指定基于现有表的一组转换操作,基本上只是保存查询计划,这样可以方便组织或重用查询逻辑

    • 创建视图
    CREATE VIEW just_usa_view
    AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
    -- 可以创建仅在当前会话期间可用,且未注册到数据库的临时视图
    CREATE TEMP VIEW just_usa_view_temp
    AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
    
    CREATE GLOBAL TEMP VIEW just_usa_view_temp
    AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
    -- 显示指定是否覆盖已存在的视图
    CREATE OR REPLACE TEMP VIEW just_usa_view_temp
    AS SELECT * FROM flights WHERE DEST_COUNTRY_NAME = 'UNITED STATES'
    --可以像查询数据表一样查询视图
    SELECT * FROM just_usa_view_temp
    

    视图实际上是一种转换,Spark只会在查询时执行它

    • 删除视图
    DROP VIEW IF EXISTS just_usa_view
    
    2.3 数据库
    • 创建数据库
    CREATE DATABASE some_db
    
    • 选择数据库
    use some_db
    -- 列出当前数据库中的所有数据表
    show tables
    -- 查看当前正在使用的数据库
    SELECT current_database()
    
    • 删除数据库
    DROP DATABASE IF EXISTS some_db
    

    三、 高级主题

    3.1 复杂类型

    SparkSQL中支持三种复杂类型:结构体(struct)、列表(list)和映射(map)。

    • 结构体
    CREATE VIEW IF NOT EXISTS nested_data
    AS SELECT
    -- 创建一个结构体
    (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME)  as country,
    count FROM flights
    --查询结构体中的某一列
    SELECT country.DEST_COUTRY_NAME, count from nested_data
    --查询结构体中所有值
    SELECT country.*, count FROM nested_data
    
    • 列表

    在Spark SQL 中有两种创建列表的方式:

    collect_list 创建一个包含值的列表
    collect_set创建一个不含有重复值的列表
    
    select dest_country_name as new_name, collect_list(count) as flights_counts,
    collect_set(origin_country_name) as origin_set
    from flights group by dest_country_name
    --可以通过设定值的方法来手动地创建数组
    select dest_country_name, ARRAY(1, 2, 3) from flights
    
    • 函数

    若要在Spark SQL中的函数列表,可以使用SHOW FUNCTIONS 语句

    show functions
    -- 指定查询系统函数
    show system functions
    -- 指定查询用户函数
    show user functions
    -- 可以通过传递带有通配符 "*" 的字符串来实现过滤选择
    show functions "s*";
    -- 也可以包括LIKE关键字
    show functions like "collect*";
    
    • 子查询

    在Spark中有两个基本子查询:

    相关子查询: 使用来自查询外的一些信息
    不相关子查询:不包括外部的信息
    
    • 不相关谓词子查询
    --此查询是不相关的,因为它不包含来查询外部的信息,这是一个可以自行运行的查询
    select * from flights where origin_country_name in (
    select dest_country_name from flights group by dest_country_name order by sum(count) desc limit 5
    )
    
    • 相关谓词子查询
      相关谓词子查询允许在内部查询中使用外部作用域的信息。
    select  * from f1 where exists
    (select 1 from flights f2 where f1.dest_country_name = f2.origin_country_name)
    and exists
    (select 1 from flights f2 where f2.dest_country_name = f1.origin_country_name)
    
    • 不相关子标量查询
      使用不相关的标量查询scalar query,可以引入一些以前可能没有的补充信息
    select *, (select max(count) from flights) as maximum from flights
    
    2.3 其他功能
    • 配置
    peoperty name default meaning
    spark.sql.inMemoryColumnarStorage.compressed true 如果设置为true,则Spark SQL会根据数据的统计信息
    spark.sql.inMemoryColumnarStorage.batchSize 10000 控制柱状缓存的批处理大小。较大的批处理可以提高内存利用率和压缩能力,但在缓存数据时有OutOfMemoryErrors的风险
    spark.sql.files.maxPartitionBytes 134217728(128M)单个分区中的最大字节数
    spark.sql.files.openCostInBytes 4194304(4MB) 打开一个文件的开销估计,即同时可以扫描的字节数量,这个配置会在将多个文件并入一个分区时用到,这个参数最好配置的大一些,因为装着小文件的分区往往会比装更大的分区运行更快
    spark.sql.broadcastTimeout 300 广播连接中广播等待时间的超时秒数(以秒为单位)
    spark.sql.autoBroadcastJoinThreshold 10485760(10MB) 配置在执行连接时,将广播给所有节点的表的最大大小。可以将此值设置为-1来禁用广播
    spark.sql.shuffle.partitions 200 配置在为连接或聚合shuffle数据时要使用的分区数

    相关文章

      网友评论

          本文标题:SparkSQL简介

          本文链接:https://www.haomeiwen.com/subject/wmzghltx.html