美文网首页Spark
Kyuubi 解锁 Spark SQL on CDH 6

Kyuubi 解锁 Spark SQL on CDH 6

作者: 517001e7cb6e | 来源:发表于2021-08-30 18:16 被阅读0次

    背景

    CDH 最后一个免费版 6.3.2 发布一年有余,离线计算核心组件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。随着 Spark 3.0 的重磅发布,在性能方面又迎来了一次飞跃,本文将描述把 Spark 3 集成到 CDH 6.3.1(未开启 Kerberos) 的过程,并使用 Kyuubi 替换 HiveServer2,实现 OLAP、ETL 等场景下从 HiveQL 到 SparkSQL 的无缝迁移,享受 5x-10x 的性能红利。

    CDH 缺陷修复

    [ORC-125] 修复 Hive 不能读取高版本 ORC 写入的数据

    当使用 Hive 读取由 Presto 或者 Spark 等写入的 ORC 文件时,会出现以下错误。

    ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6
    

    该问题在 ORC 上游被修复 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE

    ORC 最早是 Hive 的一个子项目,在 CDH 6 集成的 Hive 2.1 这个版本里,ORC 还没有分离出去,所以这个问题要在 Hive 源码里修复。

    我做了一个打包好的修复版本,GitHub 传送门,下载更换 /opt/cloudera/parcels/CDH/lib/hive/lib 路径下的 hive-exec-2.1.1-cdh6.3.1.jar, hive-orc-2.1.1-cdh6.3.1.jar 即可。(至少需要更换 Hadoop Client、HiveServer2 节点,如果你不知道我在说什么,就把所有节点都换掉)

    Spark 3.1

    [SPARK-33212] Spark 使用 Hadoop Shaded Client

    Hadoop 3.0 提供了 Shaded Client,用于下游项目规避依赖冲突 [HADOOP-11656] Classpath isolation for downstream clients

    Spark 3.2 在 hadoop-3.2 profile 中切换到了 Hadoop Shaded Client
    [SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile

    该更改不是必须的,但个人建议从 Spark 主线将该补丁移植到 branch-3.1 使用,以规避潜在的依赖冲突。

    [CDH-71907] Spark HiveShim 适配 CDH Hive

    Spark 通过反射和隔离的类加载器来实现对多版本 Hive Metastore 的支持,详情参考
    Interacting with Different Versions of Hive Metastore - Spark Documentation

    CDH 6 使用修改过的 Hive 2.1.1 版本,其方法签名与 Apache 版本有所不同,故 Spark HiveShim 反射调用会出现找不到方法签名,需要手动将 CDH-71907 补丁打到 branch-3.1。

    Spark External Shuffle Service 协议兼容

    Spark shuffle 时,mapper 会将数据写入到本地磁盘而非 HDFS,引入 ESS 后,会将文件信息注册到 ESS 中,将 mapper 与 reducer 解耦。CDH 中,默认会启用 Spark Yarn External Shuffle Service,作为 YARN AUX Service 在所有 Yarn Node 上启动。

    Spark 3 修改了 shuffle 通信协议,在与 CDH 2.4 版本的 ESS 交互时,需要设置 spark.shuffle.useOldFetchProtocol=true,否则可能报如下错误。[SPARK-29435] Spark 3 doesn't work with older shuffle service

    IllegalArgumentException: Unexpected message type: <number>.
    

    Spark 版本迁移指南

    如果你有现存的基于 CDH Spark 2.4 的 Spark SQL/Job,在将其迁移至 Spark 3 版本前,请参阅完整的官方迁移指南。

    Spark 部署

    官方文档 Running Spark on YARN - Documentation 中提到

    To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME/jars and upload it to the distributed cache.

    因此,无需在 CDH 所有节点上部署 Spark 3,只需在 Hadoop Client 节点上部署 Spark 3 即可。

    如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。

    我基于 Spark 3.1.2 制作了一个适配 CDH 6 的版本, GitHub 传送门 ,下载解压至 /opt,并软链至 /opt/spark3

    [hive@cdh-kyuubi]$ ls -l /opt | grep spark
    lrwxrwxrwx  1 root         root           39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2
    drwxr-xr-x 13 hive         hive         4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2
    

    配置 Hadoop、Hive

    CDH 会将配置文件自动分发到所有节点 /etc 目录下,建立软链即可。

    ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/
    ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/
    ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/
    ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/
    

    配置 Spark 环境变量 /opt/spark3/conf/spark-env.sh

    #!/usr/bin/env bash
    
    export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
    export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf
    

    配置 Spark 默认参数 /opt/spark3/conf/spark-defaults.conf

    请参考 Configuration - Spark Documentation 根据集群环境实际情况进行微调

    spark.authenticate=false
    spark.io.encryption.enabled=false
    spark.network.crypto.enabled=false
    
    spark.eventLog.enabled=true
    spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
    spark.driver.log.dfsDir=/user/spark/driverLogs
    spark.driver.log.persistToDfs.enabled=true
    
    spark.files.overwrite=true
    spark.files.useFetchCache=false
    
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    
    spark.shuffle.service.enabled=true
    spark.shuffle.service.port=7337
    spark.shuffle.useOldFetchProtocol=true
    
    spark.ui.enabled=true
    spark.ui.killEnabled=true
    spark.yarn.historyServer.address=http://cdh-master2:18088
    spark.yarn.historyServer.allowTracking=true
    
    spark.master=yarn
    spark.submit.deployMode=cluster
    
    spark.driver.memory=2G
    spark.executor.cores=6
    spark.executor.memory=8G
    spark.executor.memoryOverhead=2G
    spark.memory.offHeap.enabled=true
    spark.memory.offHeap.size=2G
    
    spark.dynamicAllocation.enabled=true
    spark.dynamicAllocation.executorIdleTimeout=60
    spark.dynamicAllocation.minExecutors=0
    spark.dynamicAllocation.schedulerBacklogTimeout=1
    
    spark.sql.cbo.enabled=true
    spark.sql.cbo.starSchemaDetection=true
    spark.sql.datetime.java8API.enabled=false
    spark.sql.sources.partitionOverwriteMode=dynamic
    
    spark.sql.hive.convertMetastoreParquet=false
    spark.sql.hive.convertMetastoreParquet.mergeSchema=false
    spark.sql.hive.metastore.version=2.1.1
    spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*
    
    spark.sql.orc.mergeSchema=true
    spark.sql.parquet.mergeSchema=true
    spark.sql.parquet.writeLegacyFormat=true
    
    spark.sql.adaptive.enabled=true
    spark.sql.adaptive.forceApply=false
    spark.sql.adaptive.logLevel=info
    spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
    spark.sql.adaptive.coalescePartitions.enabled=true
    spark.sql.adaptive.coalescePartitions.minPartitionNum=1
    spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024
    spark.sql.adaptive.fetchShuffleBlocksInBatch=true
    spark.sql.adaptive.localShuffleReader.enabled=true
    spark.sql.adaptive.skewJoin.enabled=true
    spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
    spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128m
    spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
    spark.sql.autoBroadcastJoinThreshold=-1
    

    验证 spark-shell 工作正常

    [hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shell
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
    Spark context Web UI available at http://cdh-master1:4040
    Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.1.2-cdh6
          /_/
    
    Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.sql("show databases").show
    +----------------+
    |       namespace|
    +----------------+
    |            test|
    +----------------+
    scala>
    

    至此,Spark 3 on CDH 6 已经部署完成,可以像 CDH 自带的 Spark 2.4 一样,正常的使用 spark-submit、spark-shell,但依旧不支持 spark-sql,spark-thriftserver。

    相比略显鸡肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的选择,因此我故意移除了这两个功能。

    注意:我提供的构建版,没有启用 spark-thriftserver 模块,为正常使用 Kyuubi,请下载 hive-service-rpc-3.1.2.jar 添加到 /opt/spark3/jars 路径。

    Kyuubi —— 解锁 Spark SQL 更多场景

    简言之,Apache Kyuubi (Incubating) 之于 Spark,类似 HiveServer2 之于 Hive。

    Kyuubi 通过将 Spark 暴露一个与 HiveServer2 完全兼容的 Thrift API,可以兼容现有的 Hive 生态,如 beeline,hive-jdbc-driver,HUE,Superset 等。

    可以通过以下文档来了解 Kyuubi 的架构,Kyuubi 与 HiveServer2 和 Spark Thrift Server 的异同。

    Kyuubi 部署

    Kyuubi 无需任何修改即可适配 CDH 6,下面给出关键步骤,详情可以参考 Deploy Kyuubi engines on Yarn — Kyuubi documentation

    同样的,如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。

    解压部署

    下载 kyuubi-1.3.0-incubating-bin.tgz 解压至 /opt,并创建软链到 /opt/kyuubi

    [hive@cdh-external opt]$ ls -l /opt | grep kyuubi
    lrwxrwxrwx  1 root         root           32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bin
    drwxrwxr-x 13 hive         hive         4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin
    

    修改配置

    按需修改 /opt/kyuubi/conf/kyuubi-env.sh

    #!/usr/bin/env bash
    
    export JAVA_HOME=/usr/java/default
    export SPARK_HOME=/opt/spark3
    export SPARK_CONF_DIR=${SPARK_HOME}/conf
    export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
    
    export KYUUBI_PID_DIR=/data/log/service/kyuubi/pid
    export KYUUBI_LOG_DIR=/data/log/service/kyuubi/logs
    export KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/work
    export KYUUBI_MAX_LOG_FILES=10
    

    参考 Kyuubi Configurations System — Kyuubi documentation 按需修改 /opt/kyuubi/conf/kyuubi-defaults.conf

    kyuubi.authentication=NONE
    
    kyuubi.engine.share.level=USER
    
    kyuubi.frontend.bind.host=0.0.0.0
    kyuubi.frontend.bind.port=10009
    
    kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181
    kyuubi.ha.zookeeper.namespace=kyuubi
    
    kyuubi.session.engine.idle.timeout=PT10H
    
    spark.master=yarn
    spark.submit.deployMode=cluster
    
    spark.dynamicAllocation.enabled=true
    spark.dynamicAllocation.minExecutors=0
    spark.dynamicAllocation.maxExecutors=20
    spark.dynamicAllocation.executorIdleTimeout=60
    

    注意:kyuubi-defaults.conf 中的 spark 配置优先级高于 spark-defaults.conf

    性能调优

    How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation
    How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation

    启动

    使用 /opt/kyuubi/bin/kyuubi 在前台或后台启动 Kyuubi Server。

    [hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --help
    Usage: bin/kyuubi command
      commands:
        start        - Run a Kyuubi server as a daemon
        run          - Run a Kyuubi server in the foreground
        stop         - Stop the Kyuubi daemon
        status       - Show status of the Kyuubi daemon
        -h | --help  - Show this help message
    

    Beeline 连接

    使用默认参数连接 Kyuubi

    beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata
    

    使用自定义参数连接 Kyuubi

    beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch
    

    详细配置参考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation

    HUE 连接

    简单说,在 Cloudera Manager 中修改 HUE 配置项 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中开启 Spark SQL 引擎。

    [desktop]
     app_blacklist=zookeeper,hbase,impala,search,sqoop,security
     use_new_editor=true
    [[interpreters]]
    [[[sparksql]]]
      name=Spark SQL
      interface=hiveserver2
    [[[hive]]]
      name=Hive
      interface=hiveserver2
    # other interpreters
      ...
    [spark]
    sql_server_host=kyuubi
    sql_server_port=10009
    

    详细配置参考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation

    Kyuubi engine 共享级别与应用场景

    Kyuubi Spark engine 即一个 Spark driver,通过控制 engine 的共享策略,可以在隔离性和资源利用率上取得平衡。Kyuubi 提供 3 种 engine 共享级别,分别为 CONNECTION,USER(默认),SERVER。

    下面的讨论均假设使用 YARN Cluster 模式启动 Kyuubi Spark engine。

    我们首先补充一些时间开销和 Spark 操作执行的信息。

    Kyuubi Spark engine 在冷启动时,会由 Kyuubi Server 通过 spark-submit 命令向 YARN 提交一个 Spark App,即 engine,该过程从提交到 Spark driver 启动约需要 5-6s,然后 engine 将自己注册到 Zookeeper,Kyuubi Server 监听 Zookeeper 发现 engine 并建立连接,该过程约 1-2s,如此算来,在 YARN 资源空闲时,整个 engine 冷启动时间约 6-8s;Client 连接到存在的 engine,通常耗时 1s 内。

    如果启动了 Spark 的 executor 动态伸缩特性,真正执行 SQL 任务时,如果资源有富余,会动态创建 executor,每个 executor 创建耗时约为 2-3s。

    元数据、DDL 等操作,如获取 database 列表,CREATE TABLE 语句等,会在 driver 上执行;计算任务如 JOINCOUNT() 等,会由 driver 生成执行计划,在 executor 上执行。

    在我们的生产环境中,大概有如下三种使用场景:

    1. 使用 HUE 进行 ad-hoc 查询
      该场景中,会有多个用户进行查询,一般会运行相对较大的查询任务,用户对连接创建时间以及元数据加载时间较为敏感,但对查询结果响应时间有一定的容忍性。
      在这种场景中,使用默认的 USER 共享级别,每个用户只使用一个 Spark engine,配合使用 Spark 的动态伸缩特性,动态的创建和销毁 executor,在保证用户之间隔离的基础上,降低启动时间和资源占用。建议的关键配置如下:
    kyuubi.engine.share.level=USER
    kyuubi.session.engine.idle.timeout=PT1H
    
    spark.dynamicAllocation.enabled=true
    spark.dynamicAllocation.minExecutors=0
    spark.dynamicAllocation.maxExecutors=30
    spark.dynamicAllocation.executorIdleTimeout=120
    
    1. 使用 Beeline 运行批任务 SQL
      该场景会使用统一的 batch 账号提交 SQL 任务,对响应时间敏感度低,但对稳定性要求非常要,并且应尽可能的最大化利用集群资源。
      推荐在该场景下将共享级别调整为 CONNECTION,这样每个 SQL 执行将会使用独立的 Spark driver,并且 SQL 执行完毕后 Spark driver 立即退出,保障离线任务互不干扰,并且资源及时释放。建议的关键配置如下:
    kyuubi.engine.share.level=CONNECTION
    
    spark.dynamicAllocation.enabled=true
    # 根据任务具体资源消耗估算,从 workflow 整体上提升集群资源利用率
    spark.dynamicAllocation.minExecutors=5
    spark.dynamicAllocation.maxExecutors=30
    
    1. 使用 Superset 进行多数据源联邦查询
      该场景中,只有一个 service 账号,会定时同时刷新大量的图表,对连接创建时间、查询结果响应时间、并发度都有较高的要求。
      该场景中,driver 和 executor 的启动时间都是不容忽视的,因此在 ad-hoc 查询配置的基础上,应延长 driver、executor 的闲置等待时间,并且设定最小的 executor 保活数量,保证时刻有 executor 常驻,能快速响应较小的查询。建议的关键配置如下:
    kyuubi.engine.share.level=USER
    kyuubi.session.engine.idle.timeout=PT10H
    
    spark.dynamicAllocation.enabled=true
    spark.dynamicAllocation.minExecutors=6
    spark.dynamicAllocation.maxExecutors=10
    spark.dynamicAllocation.executorIdleTimeout=120
    

    我们可以启动三个 Kyuubi Server(单机或集群)来分别应对如上的三种场景,但也可以仅启动一个 Kyuubi Server(单机或集群)来满足不同的场景。

    一种建议的实践方式是 Kyuubi Server 配置使用默认的 USER 共享级别,这样客户端连接时,会使用默认配置;当 ETL 跑批场景时,可以通过 beeline 参数将本次连接共享级别调整为 CONNECTION;类似的,在 Superset 连接中配置独立参数。

    结语

    现在去跑一些 SQL,体验 Spark SQL 带来的性能飞跃吧!

    相关文章

      网友评论

        本文标题:Kyuubi 解锁 Spark SQL on CDH 6

        本文链接:https://www.haomeiwen.com/subject/oeloiltx.html