自定义partition之后, spark shuffle 过程中出现了错误,具体信息如下:
ERROR org.apache.spark.internal.Logging$class: User class threw exception: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 6 in stage 40.0 failed 4 times, most recent failure:
Lost task 6.3 in stage 40.0 (TID 13372, host, executor 40):
java.lang.ArrayIndexOutOfBoundsException: 714
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:214)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:405)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:209)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
相同的代码在另一个数据集上跑的时候报错了,通过dag定位到这段代码中用到了自定义partition,具体定义如下:
new Partitioner {
override def numPartitions: Int = splitsNum / 100
override def getPartition(key: Any): Int = {
key.asInstanceOf[String] / 100
}
}
通过源码定位到
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = committedSegment.length();
spills.add(spillInfo);
}
spillInfo.partitionLengths
是个array
因为 currentPartition
的值超过了才会发生这个问题
那么spillInfo.partitionLengths
这个的大小是由什么决定的呢?
查看源码可以看出spillInfo
初始化的时候就定义了该数组的大小
final class SpillInfo {
final long[] partitionLengths;
final File file;
final TempShuffleBlockId blockId;
SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
this.partitionLengths = new long[numPartitions];
this.file = file;
this.blockId = blockId;
}
}
而该对象的初始化则为:
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
其中numPartitions
就是我们自定义partition中的值
再仔细看了spark UI中的dag图,有个shuffle阶段的分区数就是为 714,并且由于代码原因key的值 <splitsNum
,这样就可以定位到问题应该是partition部分有问题,那具体是什么问题呢? 整除原因
splitsNum | numPartitions | key | partitionId |
---|---|---|---|
71400 | 714 | 71399 | 713 |
71401 | 714 | 71400 | 714 |
从上面可以看出第二种情况的时候partitionId超过了numPartitions, 此时就发生了数组越界的问题
网友评论