Spark Core 调优指南

作者: 和心数据 | 来源:发表于2017-08-19 15:55 被阅读288次

    1 体系

    体系

    2 配置

    • 资源分配

      • num-executors:executor的个数
      • executor-cores:cpu core 的两倍
      • executor-memory:每个executor的内存大小
      • driver-memory:driver的内存大小
    • 并行度

      • spark.default.parallelism
      • spark.sql.partitions
      • repartition(num)
    • 内存使用

      • spark.storage.memoryFraction:用于cache的内存比例
      • spark.shuffle.memoryFraction:shffule阶段的缓存占内存比例

    3 代码

    • 不要重复创建RDD
    • 重复使用的RDD进行cache
    • 使用高性能算子
      • mapPartition代替map
      • foreachPartition代替foreach
      • 用reduceByKey代替groupByKey
    • filter以后使用coalesce减少小任务
    • 广播大变量:sc.broadcast

    4 数据

    • 序列化
      • 使用KryoSerializer代替Java序列化
    • 文件格式
      • 使用parquet文件格式,列式存储,读取效率高

    5 倾斜

    • 聚合(xxByKey)

      • 造成倾斜的Key数量小且不重要

        • 抽样+过滤
      • 造成倾斜的Key数量多且重要

        • 增加并行度
        • 局部聚合+全局聚合给每个Key加上前缀,聚合
        • 对上步聚合结果的Key去前缀,聚合
    • 连接

      • 小表连接大表

        • 将reduce join 转成map join
        • 使用广播变量将小表数据进行广播
        • SparkSQL设置spark.sql.autoBroadcastJoinThreshold,默认10m
      • 大表连接大表

        • 造成倾斜的Key不多

          • 对RDD1进行sample找出造成倾斜的Key
          • 分别对RDD1和RDD2进行filter将其分成skewRDD1和commonRDD1以及skewRDD1和commonRDD2
          • 然后对skewRDD1的key添加随机前缀n,对skewRDD2进行n倍扩容,然后join,再对结果的key进行前缀移除得到joinRDD1
          • 将commonRDD1和commonRDD2进行连接,得到joinRDD2
            joinRDD1.union(joinRDD2)
        • 造成倾斜的Key多

          • 对RDD1进行随机前缀n的添加
          • 对RDD2进行n倍扩容
          • 然后进行连接
          • 进行随机前缀的移除处理得到结果

    相关文章

      网友评论

        本文标题:Spark Core 调优指南

        本文链接:https://www.haomeiwen.com/subject/ezctdxtx.html