美文网首页
Java-Spark系列10-Spark性能调优概述

Java-Spark系列10-Spark性能调优概述

作者: 只是甲 | 来源:发表于2021-10-11 17:38 被阅读0次

    一.Spark 性能优化概述

    首先笔者能力优先,使用Spark有一段时间,如下是笔者的工作经验的总结。

    Spark任务运行图:


    image.png

    Spark的优化思路:
    一般是从3个层面进行Spark程序的优化:

    1. 运行环境优化
    2. RDD算子优化
    3. 参数微调

    二.运行环境优化

    2.1 数据本地性

    我们知道HDFS的数据文件存储在不同的datanode,一般数据副本数量是3,因为Spark计算的数据量比较大,如果数据不在本节点,需要通过网络去其它的datanode读取数据。

    所以此时我们可以通过提高数据本地性,减少网络传输,来达到性能优化的目的。

    1. 计算和存储同节点(executor和HDFS的datanode、hbase的region server同节点)
    2. executor数目合适: 如果100个数据界定,3个计算节点,就有97份网络传递,所以此种情况可以适当增加计算节点。
    3. 适当增加数据副本数量

    2.2 数据存储格式

    推荐使用列式存储格式: parquet.
    parquet存在如下优先:

    1. 相同数据类型的数据有很高压缩比
    2. Hive主要支持ORC、也支持parquet

    三.RDD算子优化

    3.1 尽可能复用同一个RDD

    每创建一个RDD都会带来性能的开销,尽可能的对同一个RDD做算子操作,而不要频繁创建新的
    RDD。

    3.2 对多次使用的RDD进行持久化

    如果RDD的算子特别多,需要频繁多次操作同一个RDD,最好的办法是将该RDD进行持久化,

    四.参数微调

    1. num-executors
      参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

    2. executor-cores
      参数说明:该参数用于设置每个Executor进程的CPU core数量。

    3. driver-memory
      参数说明:该参数用于设置Driver进程的内存。

    4. spark.default.parallelism
      参数说明:该参数用于设置每个stage的默认task数量。

    5. spark.storage.memoryFraction
      参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。

    6. spark.shuffle.memoryFraction
      参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。

    资源参数参考示例:

    ./bin/spark-submit \
      --master yarn-cluster \
      --num-executors 100 \
      --executor-memory 6G \
      --executor-cores 4 \
      --driver-memory 1G \
      --conf spark.default.parallelism=1000 \
      --conf spark.storage.memoryFraction=0.5 \
      --conf spark.shuffle.memoryFraction=0.3 \
    

    五.数据倾斜

    绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

    数据倾斜图例:


    image.png

    解决数据倾斜一般有如下几种常用方法:

    1. 使用Hive ETL预处理数据
      先使用Hive进行预处理数据,也就是使用Hive先计算一层中间数据,Spark从中间层数据开始计算。

    2. 过滤少数导致倾斜的key
      如果发生导致倾斜的key非常少,可以将Spark任务拆分为包含 导致倾斜的key的任务和不包含key的任务。

    3. sample采样倾斜key单独进行join
      通过采样,提前预估会发生数据倾斜的key,然后将一个join拆分为两个join,其中一个不包含该key,一个只包含该key,最后将结果集进行union。

    4. 调整并行度
      调整Shuffle并行度,数据打散

    5. 广播小数据集
      适用于一个大表,一个小表
      不用join连接操作,而改用Broadcast变量与map模拟join操作,完全规避shuffle操作
      spark.sql: spark.sql.autoBroadcastJoinThreshold=104857600

    6. 增加随机前缀
      对发生倾斜的RDD增加随机前缀
      对另外一个RDD等量扩容
      如果少量的key发生倾斜,可以先过滤出一个单独的RDD,对另外一个RDD同理吹,join之后再合并

    六. Spark常用的调优参数

    6.1 在内存中缓存数据

    Spark SQL可以通过调用Spark.catalog.cachetable ("tableName")或DataFrame.cache()来使用内存中的columnar格式缓存表。然后Spark SQL将只扫描所需的列,并自动调优压缩以最小化内存使用和GC压力。你可以调用spark.catalog.uncacheTable("tableName")从内存中删除表。

    内存缓存的配置可以在SparkSession上使用setConf方法或者使用SQL运行SET key=value命令来完成。

    参数名 默认值 参数说明 启始版本
    spark.sql.inMemoryColumnarStorage.compressed true 当设置为true时,Spark SQL会根据数据统计自动为每列选择压缩编解码器。 1.0.1
    spark.sql.inMemoryColumnarStorage.batchSize 10000 控制柱状缓存的批大小。更大的批处理大小可以提高内存利用率和压缩,但在缓存数据时可能会带来OOMs风险。 1.1.1

    6.2 其它配置项

    还可以使用以下选项调优查询执行的性能。随着更多的优化被自动执行,这些选项可能会在未来的版本中被弃用。

    参数名 默认值 参数说明 启始版本
    spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时装入单个分区的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 2.0.0
    spark.sql.files.openCostInBytes 4194304 (4 MB) 打开一个文件的估计成本,由可以在同一时间扫描的字节数来衡量。当将多个文件放入一个分区时使用。最好是高估,那么带有小文件的分区将比带有大文件的分区更快(这是首先安排的)。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 2.0.0
    spark.sql.files.minPartitionNum Default Parallelism 建议的(不是保证的)最小分割文件分区数。如果没有设置,默认值是' spark.default.parallelism '。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 3.1.0
    spark.sql.broadcastTimeout 300 broadcast join 等待时间的超时(秒) 1.3.0
    spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置在执行联接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为-1,可以禁用广播。注意:目前统计只支持运行ANALYZE TABLE COMPUTE statistics noscan命令的Hive Metastore表。 1.1.0
    spark.sql.shuffle.partitions 200 配置将数据变换为连接或聚合时要使用的分区数量。 1.1.0
    spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置阈值以启用作业输入路径的并行列出。如果输入路径数大于该阈值,Spark将通过Spark分布式作业列出文件。否则,它将退回到顺序列表。此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。 1.5.0
    spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作业输入路径的最大列出并行度。如果输入路径的数量大于这个值,它将被降低到使用这个值。与上面一样,此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。 2.1.1

    6.3 SQL查询连接的hint

    join策略提示BROADCAST、MERGE、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL,在将指定的关系加入到另一个关系时,指示Spark对每个指定的关系使用暗示策略。例如,在表' t1 '上使用BROADCAST提示时,广播加入(广播散列连接或广播嵌套循环联接取决于是否有等值连接键)与t1的构建方面将由火花即使大小的优先表t1的建议的统计配置spark.sql.autoBroadcastJoinThreshold之上。

    当连接两端指定了不同的连接策略提示时,Spark会优先考虑BROADCAST提示而不是MERGE提示,优先考虑SHUFFLE_HASH提示而不是SHUFFLE_REPLICATE_NL提示。当双方都指定了BROADCAST提示或SHUFFLE_HASH提示时,Spark将根据连接类型和关系的大小选择构建端。

    请注意,不能保证Spark会选择提示中指定的连接策略,因为特定的策略可能不支持所有的连接类型。

    -- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
    SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
    

    Coalesce hint允许Spark SQL用户控制输出文件的数量,就像Dataset API中的Coalesce、repartition和repartitionByRange一样,它们可以用于性能调优和减少输出文件的数量。COALESCE hint只有一个分区号作为参数。“REPARTITION”提示有一个分区号、列或它们都作为参数。“REPARTITION_BY_RANGE”提示必须有列名,分区号是可选的。

    SELECT /*+ COALESCE(3) */ * FROM t
    SELECT /*+ REPARTITION(3) */ * FROM t
    SELECT /*+ REPARTITION(c) */ * FROM t
    SELECT /*+ REPARTITION(3, c) */ * FROM t
    SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
    SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
    

    6.4 自适应查询执行

    Adaptive Query Execution (AQE)是Spark SQL中的一种优化技术,它利用运行时统计信息来选择最高效的查询执行计划。默认情况下AQE是禁用的。Spark SQL可以使用Spark.SQL.adaptive.enabled的伞配置来控制是否打开/关闭。从Spark 3.0开始,AQE中有三个主要特性,包括合并shuffle后分区、将排序合并连接转换为广播连接以及倾斜连接优化。

    6.5 合并分区后重新组合

    当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都为true时,该特性根据map输出统计信息来合并post shuffle分区。这个特性简化了运行查询时shuffle分区号的调优。您不需要设置合适的shuffle分区号来适合您的数据集。一旦您通过Spark .sql. adaptive.coalescepartitions . initialpartitionnum配置设置了足够大的初始shuffle分区数,Spark就可以在运行时选择适当的shuffle分区号。

    参数名 默认值 参数说明 启始版本
    spark.sql.adaptive.coalescePartitions.enabled true 当true和Spark .sql. adaptive_enabled为true时,Spark会根据目标大小(由Spark .sql. adaptive_advisorypartitionsizeinbytes指定)合并连续的shuffle分区,以避免过多的小任务。 3.0.0
    spark.sql.adaptive.coalescePartitions.minPartitionNum Default Parallelism 合并后的最小洗牌分区数。如果不设置,则默认为Spark集群的默认并行度。此配置仅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同时启用时有效。 3.0.0
    spark.sql.adaptive.coalescePartitions.initialPartitionNum 200 合并前的初始shuffle分区数。默认情况下它等于spark.sql.shuffle.partitions。此配置仅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同时启用时有效。 3.0.0
    spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自适应优化期间shuffle分区的建议大小(当spark.sql. adaptive_enabled为true时)。当Spark对小shuffle分区或斜shuffle分区进行合并时生效。 3.0.0

    6.6 将排序合并联接转换为广播联接

    当任何连接侧的运行时统计数据小于广播散列连接阈值时,AQE将排序合并连接转换为广播散列连接。这不是一样有效规划一个广播散列连接首先,但这总比继续做分类合并加入,我们可以节省连接双方的排序,并在本地读取洗牌文件节省网络流量(如果spark.sql.adaptive.localShuffleReader.enabled被设置为true)

    6.7 优化倾斜连接

    数据倾斜会严重降低连接查询的性能。该特性通过将倾斜任务拆分(如果需要的话还可以复制)为大小大致相同的任务,动态处理排序-合并连接中的倾斜任务。当spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置同时启用时生效。

    参数名 默认值 参数说明 启始版本
    spark.sql.adaptive.skewJoin.enabled true 当true和Spark .sql.adaptive.enabled为true时,Spark通过拆分(并在需要时复制)倾斜分区来动态处理排序-合并连接中的倾斜。 3.0.0
    spark.sql.adaptive.skewJoin.skewedPartitionFactor 10 如果一个分区的大小大于这个因子乘以中值分区大小,并且大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则认为该分区是倾斜的。 3.0.0
    spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分区的字节大小大于这个阈值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分区中值大小,则认为该分区是倾斜的。理想情况下,该配置应该设置为大于spark.sql.adaptive.advisoryPartitionSizeInBytes。 3.0.0

    参考:

    1. http://spark.apache.org/docs/latest/rdd-programming-guide.html
    2. https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
    3. https://blog.csdn.net/meihao5/article/details/81084876
    4. http://spark.apache.org/docs/latest/sql-performance-tuning.html

    相关文章

      网友评论

          本文标题:Java-Spark系列10-Spark性能调优概述

          本文链接:https://www.haomeiwen.com/subject/jibzvltx.html