美文网首页
Spark任务输出文件过程详解

Spark任务输出文件过程详解

作者: 疯狂的哈丘 | 来源:发表于2019-06-14 19:49 被阅读0次

    https://blog.csdn.net/u013332124/article/details/92001346

    一、Spark任务输出文件的总过程

    当一个Job开始执行后,输出文件的相关过程大概如下:

    1、Job启动时创建一个目录: ${output.dir}/_temporary/${appAttemptId} 作为本次运行的输出临时目录

    2、当有task开始运行后,会创建 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId}/${fileName} 文件,后面这个task的所有输出都会被写到这个文件中

    3、当task运行完后,需要检查是否要commit,如果需要commit,会调用OutputCommitter#commitTask()方法。commit的细节后面说

    4、等整个Job执行完就调用OutputCommitter#commitJob()方法。具体的过程也在下面介绍commit时说。

    output.dir表示用户指定的输出目录,appAttemptId表示任务的attemptId,一般从1开始一直递增。taskAttemptId表示task的attemptId,比如taskId是0,第一次运行,这个id就是0.0。
    OutputCommitter 只是一个抽象类,spark运行时会从配置中获取指定的实现类,如果配置中没指定,spark默认会使用 org.apache.hadoop.mapred.FileOutputCommitter 的实现。

    二、Commit细节分析

    1、commitTask 介绍

    1.1、判断是否需要commit

    当task执行完后,会去检查以下状态,如果下面的条件达成,就不会执行commit

    • ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttempt} 目录不存在 (说明这个task的临时输出目录不存在,明显是有问题的)
    • 如果开启了Output commit coordination,就需要通过rpc询问Driver是否可以commit (根据spark.hadoop.outputCommitCoordination.enabled参数,默认为true.如果开启了推测执行,这个一定要设置为true)
    • Driver的CommitCoordinator判断task运行失败 (task运行失败就没必要commit了)
    • Driver的CommitCoordinator判断该task的其他attempt已经commit过了 (如果commit的taskAttemptId和当前一样,那么可以再次commit,说明task commit是一个幂等的操作)

    1.2、task的commit细节

    因为我们大部分情况下用的都是FileOutputCommitter,所以下面主要介绍一下这个类的commitTask实现。

    FileOutputCommitter的实际commitTask细节和参数 mapreduce.fileoutputcommitter.algorithm.version 有关(默认值是1)。

    mapreduce.fileoutputcommitter.algorithm.version=1时:

    commit的操作是将 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} 重命名为 ${output.dir}/_temporary/${appAttemptId}/${taskId}

    mapreduce.fileoutputcommitter.algorithm.version=2时:

    commit的操作是将 ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} 下的文件移动到 ${output.dir} 目录下 (也就是最终的输出目录)

    spark任务可以通过设置spark配置 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2来开启版本2的commit逻辑
    在hadoop 2.7.0之前,FileOutputCommitter的实现没有区分版本,统一都是使用version=1的commit逻辑。因此如果spark的hadoop依赖包版本如果低于2.7.0,设置mapreduce.fileoutputcommitter.algorithm.version=2是没有用的

    2、commitJob 介绍

    Job执行完后,会调用commitJob方法,我们还是看一下FileOutputCommitter的实现:

    commitJob的细节也和mapreduce.fileoutputcommitter.algorithm.version 参数有关(默认值是1)

    mapreduce.fileoutputcommitter.algorithm.version=1时:

    由 Driver 单线程遍历所有 committedTaskPath,也就是${output.dir}/_temporary/${appAttemptId} 下的所有文件,然后移动到 ${output.dir} 目录下。然后创建_SUCCESS表示任务结束

    mapreduce.fileoutputcommitter.algorithm.version=2时:

    只需要创建_SUCCESS文件,因为输出文件在task执行完后就已经移动到输出目录了

    在commitJob完后,spark还会执行cleanupJob将${output.dir}/_temporary 目录删除

    三、V1和V2 commiter版本比较

    mapreduce.fileoutputcommitter.algorithm.version 参数对文件输出有很大的影响,下面总结一下两种版本在各方面的优缺点。

    1、性能方面

    v1在task结束后只是将输出文件拷到临时目录,然后在job结束后才由Driver把这些文件再拷到输出目录。如果文件数量很多,Driver就需要不断的和NameNode做交互,而且这个过程是单线程的,因此势必会增加耗时。如果我们碰到有spark任务所有task结束了但是任务还没结束,很可能就是Driver还在不断的拷文件。

    v2在task结束后立马将输出文件拷贝到输出目录,后面Job结束后Driver就不用再去拷贝了。

    因此,在性能方面,v2完胜v1。

    2、数据一致性方面

    v1在Job结束后才批量拷文件,其实就是两阶段提交,它可以保证数据要么全部展示给用户,要么都没展示(当然,在拷贝过程中也无法保证完全的数据一致性,但是这个时间一般来说不会太长)。如果任务失败,也可以直接删了_temporary目录,可以较好的保证数据一致性。

    v2在task结束后就拷文件,就会造成spark任务还未完成就让用户看到一部分输出,这样就完全没办法保证数据一致性了。另外,如果任务在输出过程中失败,就会有一部分数据成功输出,一部分没输出的情况。

    因此在数据一致性方面,v1完胜v2

    3、总结

    很明显,如果我们执着于性能,不在乎数据输出时的一致性,完全可以将mapreduce.fileoutputcommitter.algorithm.version设置为2来提高性能。

    但是如果我们对输出要求很高的数据一致性,那么最好不要为了性能将mapreduce.fileoutputcommitter.algorithm.version设置为2。

    参考资料

    https://issues.apache.org/jira/browse/MAPREDUCE-4815

    https://zhuanlan.zhihu.com/p/45351972

    https://mp.weixin.qq.com/s?__biz=MzU3NTE2NzAxNQ==&mid=2247484099&idx=1&sn=0a0a3a1f407d30a22dcfbd85fab488e6&chksm=fd260d8bca51849d94e8df9f2249462d5a5dfc8079c45b4d9ab489aaea77c0ce14108a948f94&token=2064668791&lang=zh_CN#rd

    相关文章

      网友评论

          本文标题:Spark任务输出文件过程详解

          本文链接:https://www.haomeiwen.com/subject/xxkmfctx.html