美文网首页spark大数据开发
[调优]数据倾斜调优~spark性能优化:

[调优]数据倾斜调优~spark性能优化:

作者: 葡萄喃喃呓语 | 来源:发表于2016-10-27 17:33 被阅读144次

    spark性能优化:数据倾斜调优 - LW_ICE - 博客频道 - CSDN.NET
    http://blog.csdn.net/lw_ghy/article/details/51419877

    调优概述
    有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。
    数据倾斜发生时的现象
      1、绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。  2、原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。
    数据倾斜发生的原理
      数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。  因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。  下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。


    如何定位导致数据倾斜的代码
      数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。某个task执行特别慢的情况
      首先要看的,就是数据倾斜发生在第几个stage中。  如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。  比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

    知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。  这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。  1、stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。  2、stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
    [python] view plain copy
    print?
    解决方案四:两阶段聚合(局部聚合+全局聚合)
      方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。  方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。  方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。  方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。  方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

    [python]
    view plain copy
    print?
    [python]
    view plain copy
    print?
    [python]
    view plain copy
    print? 在CODE上查看代码片在CODE上查看代码片 派生到我的代码片派生到我的代码片

    // 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
    JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

    // 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
    // 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
    // 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
    JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
    new PairFunction<Tuple2<Long,String>, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
    throws Exception {
    return new Tuple2<Long, Long>(tuple._1, 1L);
    }
    });
    JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
    new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
    return v1 + v2;
    }
    });
    JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
    new PairFunction<Tuple2<Long,Long>, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
    throws Exception {
    return new Tuple2<Long, Long>(tuple._2, tuple._1);
    }
    });
    final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

    // 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
    JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
    new Function<Tuple2<Long,String>, Boolean>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Boolean call(Tuple2<Long, String> tuple) throws Exception {
    return tuple._1.equals(skewedUserid);
    }
    });
    // 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
    JavaPairRDD<Long, String> commonRDD = rdd1.filter(
    new Function<Tuple2<Long,String>, Boolean>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Boolean call(Tuple2<Long, String> tuple) throws Exception {
    return !tuple._1.equals(skewedUserid);
    }
    });

    // rdd2,就是那个所有key的分布相对较为均匀的rdd。
    // 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
    // 对扩容的每条数据,都打上0~100的前缀。
    JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
    new Function<Tuple2<Long,Row>, Boolean>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
    return tuple.1.equals(skewedUserid);
    }
    }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Iterable<Tuple2<String, Row>> call(
    Tuple2<Long, Row> tuple) throws Exception {
    Random random = new Random();
    List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
    for(int i = 0; i < 100; i++) {
    list.add(new Tuple2<String, Row>(i + "
    " + tuple._1, tuple._2));
    }
    return list;
    }

        });  
    

    // 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
    // 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
    JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
    new PairFunction<Tuple2<Long,String>, String, String>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
    throws Exception {
    Random random = new Random();
    int prefix = random.nextInt(100);
    return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
    }
    })
    .join(skewedUserid2infoRDD)
    .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Tuple2<String, Row>> call(
    Tuple2<String, Tuple2<String, Row>> tuple)
    throws Exception {
    long key = Long.valueOf(tuple.1.split("")[1]);
    return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
    }
    });

    // 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
    JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

    // 将倾斜key join后的结果与普通key join后的结果,uinon起来。
    // 就是最终的join结果。
    JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

    解决方案七:使用随机前缀和扩容RDD进行join
      方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。  方案实现思路:  1、该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。  2、然后将该RDD的每条数据都打上一个n以内的随机前缀。  3、同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。  4、最后将两个处理后的RDD进行join即可。  方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。  方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。  方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。  方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。[python] view plain copy
    print?

    在CODE上查看代码片在CODE上查看代码片 派生到我的代码片派生到我的代码片

    // 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
    JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
    new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
    throws Exception {
    List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
    for(int i = 0; i < 100; i++) {
    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
    }
    return list;
    }
    });

    // 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
    JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
    new PairFunction<Tuple2<Long,String>, String, String>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
    throws Exception {
    Random random = new Random();
    int prefix = random.nextInt(100);
    return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
    }
    });

    // 将两个处理后的RDD进行join即可。
    JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

    解决方案八:多种方案组合使用
      在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

    相关文章

      网友评论

        本文标题:[调优]数据倾斜调优~spark性能优化:

        本文链接:https://www.haomeiwen.com/subject/relsuttx.html