简介
Spark 作业的性能问题往往出现在 Shuffle 上,在 Spark 先后引入了 Hash Shuffle 与 FileConsolidation 后,还是无法根本解决中间文件数太大的问题,所以 Spark 在 1.2 之后又推出了与 MapReduce 一样(你可以参照《Hadoop 海量数据处理》(第 2 版)的 Shuffle 相关章节)的 Shuffle 机制: Sort-based Shuffle,才真正解决了 Shuffle 的问题.
下面主要了解一下shuffle过程生成的index和data文件, 里面存储什么样的数据, 怎么读取
maven
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
reduce测试
@Test
public void reduceTest() {
JavaSparkContext context = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("LocalTest"));
//目录结构 C:\\Users\\xx\\AppData\\Local\\Temp\\spark-2ff5b976-7017-4622-8f19-ba09c545740c\\userFiles-c98c2979-d9cd-4b9b-b511-3782e30c19cc
System.out.println("driver目录:" + context.env().driverTmpDir().get());
ArrayList<Tuple2<Integer, String>> datas = Lists.newArrayList();
for (int i = 0; i < 10000; i++) {
datas.add(new Tuple2<>(new Random().nextInt(100), "list_" + i));
}
for (int i = 0; i < 5000; i++) {
datas.add(new Tuple2<>(new Random().nextInt(50), "list_" + i));
}
List<Tuple2<Integer, String>> collect = context.parallelizePairs(datas,5).reduceByKey(((v1, v2) -> v1 + "==>" + v2)).collect();
//List<Tuple2<Integer, String>> collect2 = context.parallelizePairs(datas,5).reduceByKey(((v1, v2) -> v1 + "-->" + v2)).collect();
while (true) {
}
}
运行该测试用例会在本地的临时目录生成spark运行时的文件,结构如下. 在blockmgr目录下会.data和.index文件
index文件读取
@Test
public void readShuffleIndex() throws IOException {
String dir = "C:\\Users\\xx\\Desktop\\新建文件夹";
File[] indexFiles = new File(dir).listFiles((f) -> f.getName().endsWith(".index"));
for (File indexFile : indexFiles) {
DataInputStream ds = new DataInputStream(new FileInputStream(indexFile));
System.out.printf("%s, offset:%s -> %s -> %s %n", indexFile.getName(), ds.readLong(), ds.readLong(),ds.readLong());
}
}
index的文件是二进制格式, 需要使用对象流进行读取,这里把临时目录下的这两类文件单拎出来放到同个目录下(方便测试). 源码位置IndexShuffleBlockResolver.getBlockData()
索引和数据文件标识
测试用例运行结果
shuffle_0_0_0.index, offset:0 -> 2919 -> 6029
shuffle_0_1_0.index, offset:0 -> 3175 -> 6269
shuffle_0_2_0.index, offset:0 -> 3094 -> 6287
shuffle_0_3_0.index, offset:0 -> 2911 -> 5896
shuffle_0_4_0.index, offset:0 -> 3052 -> 6035
data文件读取
@Test
public void readShuffleData() throws IOException {
String dir = "C:\\Users\\xx\\Desktop\\新建文件夹";
JavaSparkContext context = new JavaSparkContext(new SparkConf().setMaster("local[2]").setAppName("LocalTest"));
SerializerManager sm = context.env().blockManager().serializerManager();
File[] indexFiles = new File(dir).listFiles((f) -> f.getName().endsWith(".data"));
BlockId apply = BlockId.apply(indexFiles[0].getName().replace(".data",""));
Iterator<Object> objectIterator = sm.dataDeserializeStream(apply, new FileInputStream(indexFiles[0]), null);
objectIterator.foreach(v -> {
System.out.println(v);
return null;
});
}
data文件会进行压缩, 需要使用spark环境提供的序列化工具进行读取
源码shuffle开启压缩
文本直接打开data文件
参考
MapReduce Shuffle 和 Spark Shuffle 原理概述
Spark2.3.2源码解析:Shuffle 过程写入的 数据文件&索引文件
彻底搞懂spark的shuffle过程(shuffle write)
网友评论