0303 Performance Tuning

作者: Whaatfor | 来源:发表于2020-06-27 16:41 被阅读0次

    转载请注明出处,谢谢合作~

    该篇中的示例暂时只有 Scala 版本~

    性能调优

    对于某些工作场景,可以通过在内存中缓存数据或者开启一些试验功能来提升性能。

    在内存中缓存数据

    Spark SQL 能做以一种内存中的列式存储的格式缓存数据,可以通过调用 spark.catalog.cacheTable("tableName") 或者 dataFrame.cache() 方法实现。之后 Spark SQL 只需要扫描用到的列中的数据,而且能够自动使用压缩来节省内存空间,缓解 GC 压力。可以通过调用 spark.catalog.uncacheTable("tableName") 来删除缓存。

    内存缓存的配置项可以通过 SparkSession 对象的 setConf 方法或者执行 SQL 中的 SET key=value 语句来设置。

    Property Name Default Meaning Since Version
    spark.sql.inMemoryColumnarStorage.compressed true 当设置为 true 时,Spark SQL 会根据数据统计信息为每一列自动应用一种压缩格式。 1.0.1
    spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式存储中批次的大小。较大的批次可以提升内存利用率和压缩效率,但是会提高 OOM 的风险。 1.1.1

    其他配置项

    下列选项也可以用来对查询的性能进行调优。由于越来越多的优化都会自动进行,这些选项在未来的版本中可能被被废弃。

    Property Name Default Meaning Since Version
    spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在读取文件数据时单分区的最大字节数。该配置只在使用文件类型的数据源(比如 Parquet, JSON 和 ORC)时有效。 2.0.0
    spark.sql.files.openCostInBytes 4194304 (4 MB) 读取一个文件时的预计开销,由可以被同时扫描的字节数控制。该参数用在将多个文件归入一个分区时起作用。被高估会有积极的效果,此时包含小文件的分区将会比包含大文件的分区(优先调度)要快。该配置只在使用文件类型的数据源(比如 Parquet, JSON 和 ORC)时有效。 2.0.0
    spark.sql.broadcastTimeout 300 广播连接(Broadcast Join)中超时等待时间,单位为秒。 1.3.0
    spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 在连接操作中,如果一张表的大小低于该配置,它会被广播到其他的节点。设置为 -1 表示禁用广播连接。注意,目前只能够通过 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令获得 Hive 表的统计信息。 1.1.0
    spark.sql.shuffle.partitions 200 连接操作或者聚合操作引起的 shuffle 行为的分区数。 1.1.0

    SQL 查询连接策略提示

    目前的连接策略提示有 BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL,在一张关系表和另一张关系表做连接操作时,Spark 会根据连接策略提示采用不同的连接策略。例如,当对表「t1」使用 BROADCAST 连接策略提示时,Spark 会优先采用广播连接(根据是否含有 equi-join 键来决定采用 broadcast hash join 或者 broadcast nested loop join),即使的统计信息表明其大小超出了上述配置 spark.sql.autoBroadcastJoinThreshold 的限值。

    如果连接操作的调用者和被调用者采用了不同的连接策略提示,优先级从高到低为:BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL。如果连接操作的调用者和被调用者都采用了 BROADCAST 策略提示或者 SHUFFLE_HASH 策略提示,Spark 会根据连接类型和表数据量大小做出选择。

    注意,由于指定的连接策略提示可能并不支持所有的连接类型,所以并不能保证 Spark 所采取的连接策略一定是被指定的那个。

    spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
    

    详情参见文档 Join Hints

    SQL 查询合并提示

    合并提示可以让 Spark SQL 用户控制一些算子的输出文件数量,这些算子包括 coalescerepartitionrepartitionByRange。合并指引可以用来调优性能,减少输出文件的数量。「COALESCE」指引只有一个分区数量作为参数。「REPARTITION」的参数可以是分区数或者列名或者两者一起。「REPARTITION_BY_RANGE」提示必须有一个列名参数,分区数是可选的参数。

    SELECT /*+ COALESCE(3) */ * FROM t
    SELECT /*+ REPARTITION(3) */ * FROM t
    SELECT /*+ REPARTITION(c) */ * FROM t
    SELECT /*+ REPARTITION(3, c) */ * FROM t
    SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
    SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
    

    详情参见文档 Partitioning Hints

    自适应查询引擎

    自适应查询引擎(AQE)是 Spark SQL 当中的一项优化技术,能够充分利用运行时统计信息来选取最高效的查询计划。AQE 默认是关闭的,Spark SQL 可以使用保护伞配置参数 spark.sql.adaptive.enabled 来控制是否开启。Spark 3.0 中的 AQE 主要有三种优化手段,包括合并 shuffle 分区,将 sort-merge 连接转换为 broadcast 连接,优化连接中的数据倾斜。

    合并 shuffle 分区

    当参数 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 都被开启时,通过 map 端的统计信息合并 reduce 端的分区数。这种方式简化了查询中 shuffle 分区数量的调优,不需要针对数据指定一个合理的分区数,一旦通过参数 spark.sql.adaptive.coalescePartitions.initialPartitionNum 设置了一个足够大的初始 shuffle 分区数,Spark 会在运行时选取合理的分区数。

    Property Name Default Meaning Since Version
    spark.sql.adaptive.coalescePartitions.enabled true 当该参数和参数 spark.sql.adaptive.enabled 设置为 true 时,Spark 会根据目标大小(由参数 spark.sql.adaptive.advisoryPartitionSizeInBytes 控制)合并连续的 shuffle 分区,以避免过多的小分区。 3.0.0
    spark.sql.adaptive.coalescePartitions.minPartitionNum Default Parallelism 合并分区后的最小分区数。如果没有设置,默认值为集群最小并行数。该参数只有在参数 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 参数都开启的情况下才生效。 3.0.0
    spark.sql.adaptive.coalescePartitions.initialPartitionNum 200 分区合并前的初始分区数。默认情况下与参数 spark.sql.shuffle.partitions 的值相同。该参数只有在参数 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 参数都开启的情况下才生效。 3.0.0
    spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自适应优化时建议的分区大小,需要参数 spark.sql.adaptive.enabled 设置为 true。对于分区合并和分割数据倾斜分区起作用。 3.0.0

    将 sort-merge 连接转换为 broadcast 连接

    当连接操作的调用者或者被调用者的数据量小于广播哈希连接的限值时,AQE 会将 sort-merge 连接转换为 broadcast 连接。这种方式并不比在开始就指定广播连接策略更高效,但是也比 sort-merge 连接好多了。优化之后可以在一个节点上保存需要连接的所有数据,读取本地文件节省了网络传输开销(需要参数 spark.sql.adaptive.localShuffleReader.enabled 设置为 true)。

    优化连接中的数据倾斜

    数据倾斜会严重降低连接查询的效率,AQE 通过分割(或者折叠)将倾斜的数据分区组合成大小基本均匀的子任务来动态处理 sort-merge 连接中的数据倾斜。当参数spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled 都开启时生效。

    Property Name Default Meaning Since Version
    spark.sql.adaptive.skewJoin.enabled true 当该参数和参数 spark.sql.adaptive.enabled 设置为 true 时,Spark 通过分割(或者折叠)动态处理数sort-merge 连接中的据倾斜。 3.0.0
    spark.sql.adaptive.skewJoin.skewedPartitionFactor 10 当一个分区的大小大于该参数的值乘以分区大小中位数,同时大于参数 spark.sql.adaptive.skewedPartitionThresholdInBytes 设置的值时,该分区被判定为数据倾斜。 3.0.0
    spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 当一个分区的大小大于该参数的值,同时大于参数的spark.sql.adaptive.skewJoin.skewedPartitionFactor 值乘以分区大小中位数时,该分区被判定为数据倾斜。理想情况下该参数的值应该比参数 spark.sql.adaptive.advisoryPartitionSizeInBytes 的值大。 3.0.0

    相关文章

      网友评论

        本文标题:0303 Performance Tuning

        本文链接:https://www.haomeiwen.com/subject/rwkvfktx.html