美文网首页
Spark 自定义partition引发的shuffle问题

Spark 自定义partition引发的shuffle问题

作者: 天之見證 | 来源:发表于2019-08-05 11:09 被阅读0次

    自定义partition之后, spark shuffle 过程中出现了错误,具体信息如下:

    ERROR org.apache.spark.internal.Logging$class: User class threw exception: org.apache.spark.SparkException: 
    Job aborted due to stage failure: Task 6 in stage 40.0 failed 4 times, most recent failure:
     Lost task 6.3 in stage 40.0 (TID 13372, host, executor 40): 
    java.lang.ArrayIndexOutOfBoundsException: 714
        at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:214)
        at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:405)
        at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:209)
        at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    相同的代码在另一个数据集上跑的时候报错了,通过dag定位到这段代码中用到了自定义partition,具体定义如下:

    new Partitioner {
      override def numPartitions: Int = splitsNum / 100
    
      override def getPartition(key: Any): Int = {
        key.asInstanceOf[String] / 100
      }
    }
    

    通过源码定位到

    if (currentPartition != -1) {
        spillInfo.partitionLengths[currentPartition] = committedSegment.length();
        spills.add(spillInfo);
    }
    

    spillInfo.partitionLengths 是个array 因为 currentPartition 的值超过了才会发生这个问题

    那么spillInfo.partitionLengths 这个的大小是由什么决定的呢?

    查看源码可以看出spillInfo初始化的时候就定义了该数组的大小

    final class SpillInfo {
      final long[] partitionLengths;
      final File file;
      final TempShuffleBlockId blockId;
    
      SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
        this.partitionLengths = new long[numPartitions];
        this.file = file;
        this.blockId = blockId;
      }
    }
    

    而该对象的初始化则为:

    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
    

    其中numPartitions 就是我们自定义partition中的值

    再仔细看了spark UI中的dag图,有个shuffle阶段的分区数就是为 714,并且由于代码原因key的值 <splitsNum,这样就可以定位到问题应该是partition部分有问题,那具体是什么问题呢? 整除原因

    splitsNum numPartitions key partitionId
    71400 714 71399 713
    71401 714 71400 714

    从上面可以看出第二种情况的时候partitionId超过了numPartitions, 此时就发生了数组越界的问题

    相关文章

      网友评论

          本文标题:Spark 自定义partition引发的shuffle问题

          本文链接:https://www.haomeiwen.com/subject/xrbpdctx.html