美文网首页
Spark Shuffle索引和数据文件读取

Spark Shuffle索引和数据文件读取

作者: 清蒸三文鱼_ | 来源:发表于2021-04-16 16:20 被阅读0次

    简介

    Spark Shuffle 官网介绍

    Spark 作业的性能问题往往出现在 Shuffle 上,在 Spark 先后引入了 Hash Shuffle 与 FileConsolidation 后,还是无法根本解决中间文件数太大的问题,所以 Spark 在 1.2 之后又推出了与 MapReduce 一样(你可以参照《Hadoop 海量数据处理》(第 2 版)的 Shuffle 相关章节)的 Shuffle 机制: Sort-based Shuffle,才真正解决了 Shuffle 的问题.

    下面主要了解一下shuffle过程生成的index和data文件, 里面存储什么样的数据, 怎么读取

    maven

     <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>2.4.5</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    reduce测试

    @Test
        public void reduceTest() {
            JavaSparkContext context = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("LocalTest"));
            //目录结构 C:\\Users\\xx\\AppData\\Local\\Temp\\spark-2ff5b976-7017-4622-8f19-ba09c545740c\\userFiles-c98c2979-d9cd-4b9b-b511-3782e30c19cc
            System.out.println("driver目录:" + context.env().driverTmpDir().get());
            ArrayList<Tuple2<Integer, String>> datas = Lists.newArrayList();
            for (int i = 0; i < 10000; i++) {
                datas.add(new Tuple2<>(new Random().nextInt(100), "list_" + i));
            }
            for (int i = 0; i < 5000; i++) {
                datas.add(new Tuple2<>(new Random().nextInt(50), "list_" + i));
            }
            List<Tuple2<Integer, String>> collect = context.parallelizePairs(datas,5).reduceByKey(((v1, v2) -> v1 + "==>" + v2)).collect();
            //List<Tuple2<Integer, String>> collect2 = context.parallelizePairs(datas,5).reduceByKey(((v1, v2) -> v1 + "-->" + v2)).collect();
            while (true) {
            }
        }
    

    运行该测试用例会在本地的临时目录生成spark运行时的文件,结构如下. 在blockmgr目录下会.data和.index文件



    index文件读取

        @Test
        public void readShuffleIndex() throws IOException {
            String dir = "C:\\Users\\xx\\Desktop\\新建文件夹";
            File[] indexFiles = new File(dir).listFiles((f) -> f.getName().endsWith(".index"));
            for (File indexFile : indexFiles) {
                DataInputStream ds = new DataInputStream(new FileInputStream(indexFile));
                System.out.printf("%s, offset:%s -> %s -> %s %n", indexFile.getName(), ds.readLong(), ds.readLong(),ds.readLong());
            }
        }
    

    index的文件是二进制格式, 需要使用对象流进行读取,这里把临时目录下的这两类文件单拎出来放到同个目录下(方便测试). 源码位置IndexShuffleBlockResolver.getBlockData()

    源码
    索引和数据文件标识
    测试用例运行结果

    shuffle_0_0_0.index, offset:0 -> 2919 -> 6029
    shuffle_0_1_0.index, offset:0 -> 3175 -> 6269
    shuffle_0_2_0.index, offset:0 -> 3094 -> 6287
    shuffle_0_3_0.index, offset:0 -> 2911 -> 5896
    shuffle_0_4_0.index, offset:0 -> 3052 -> 6035

    data文件读取

        @Test
        public void readShuffleData() throws IOException {
            String dir = "C:\\Users\\xx\\Desktop\\新建文件夹";
            JavaSparkContext context = new JavaSparkContext(new SparkConf().setMaster("local[2]").setAppName("LocalTest"));
            SerializerManager sm = context.env().blockManager().serializerManager();
            File[] indexFiles = new File(dir).listFiles((f) -> f.getName().endsWith(".data"));
            BlockId apply = BlockId.apply(indexFiles[0].getName().replace(".data",""));
            Iterator<Object> objectIterator = sm.dataDeserializeStream(apply, new FileInputStream(indexFiles[0]), null);
            objectIterator.foreach(v -> {
                System.out.println(v);
                return null;
            });
        }
    

    data文件会进行压缩, 需要使用spark环境提供的序列化工具进行读取


    源码shuffle开启压缩
    文本直接打开data文件

    参考

    MapReduce Shuffle 和 Spark Shuffle 原理概述
    Spark2.3.2源码解析:Shuffle 过程写入的 数据文件&索引文件
    彻底搞懂spark的shuffle过程(shuffle write)

    相关文章

      网友评论

          本文标题:Spark Shuffle索引和数据文件读取

          本文链接:https://www.haomeiwen.com/subject/bjuelltx.html