6.3 map端计算结果缓存处理
1、bypassMergeThreshold:传递reduce端再做合并(merge)操作的阈值。如果partition的数量小于bypassmergethreshold的值,则不需要在Executor执行聚合和排序操作,只需要将各个partitation直接写到Executor的存储文件,最后再reduce端再做串联。通过配置spark.shuffle.sort.bypassMergeThreshold可以修改bypassMergeThreshold的大小,在分区数量小的时候提升计算引擎的性能。bypassMergeThreshold的默认值是200.
2、bypassMergeSort:标记是否传递到reduce端再做合并和排序,即是否直接将各个partitation直接写到Exeutor的存储文件,当没有定义aggregator、ordering函数,并且partition的数量小于等于bypassMergeThreshod时,bypassMergeSort为true。如果bypassMergeSort为true,map中间结果将直接输出到磁盘,此时不会占用太多的内存,避免了内存撑爆问题。
3、map端对计算结果在缓存中执行聚合和排序。
4、map不适用缓存,也不执行聚合排序,直接调用spillToPartitionFiles将各个partition直接写到自己的存储文件(即bypassMergeSort为true的情况),最后由reduce端对计算结果执行合并和排序。spillToPartitionFiles的实现
5、map端对计算结果简单缓存
map端计算结果缓存聚合
一个任务的分区数量通常很多,如果只是简单讲数据存储到Executor上,在执行reduce任务时会存在大量的网络I/O操作,这是网络I/O将成为系统性能的瓶颈,reduce任务读取map任务的计算结果变慢,导致其他想要分配到被这些map任务占用的节点的任务不得不等待或者降低本地化选择分配到更远的节点上,对于更远节点的I/O本身会更慢,因此还会导致更多的任务得不到分配或者无法高效本地化。经过这样的恶性循环,整个集群将变得迟钝,新的任务长时间得不到执行或者执行变慢。
通过在map端对计算结果在缓存中执行聚合和排序,能够节省I/O操作,进而提升系统性能。这种情况下,必须定义聚合器(aggregator)函数,以便对计算结果按照partitionID和key聚合后排序。
map端计算结果持久化
writePartitionedFile用于持久化计算结果。此方法有两个分支:
溢出到分区文件后合并:将内存中缓存的多个partition的计算结果分别写入到临时block文件,然后将这些block文件的内容全部写入正式的block输出文件中。
内存中排序合并:将缓存的中间计算结果按照partition分组后写入Block输出文件。此种方式还需要更新此任务与内存、磁盘有关测量的数据。
6.4.1溢出分区文件
spillToPartitionFiles用于将内存中的集合数据按照每个partition创建一个临时Block文件,为每个临时Block文件生成一个DiskBlockObjectWriter,并且用DiskBlockObjectWriter将计算结果分别写入这些临时Block文件中。createTempShuffleBlock方法创建临时的Bloc
6.5.2划分本地与远程Block
无论是本地还是从MapOutputTrackerMaster获取的状态信息,都需要按照地址划分且转换为BlockId。ShuffleBlockFetcherIterator是读取中间结果的关键。
6.7map端与reduce端组合分析
6.7.1在map端溢出分区文件,在reduce端合并组合
bypassMergeSort标记是否传递到reduce端再做合并和排序,此种情况不适用缓存,而是现将数据按照partition写入不同的文件,最后按partition顺序合并写入同一文件中。当没有hiding聚合、排序函数,且partition数量较小时,一般采用这种方式。此种方式将多个bucket合并到同一个文件,通过减少map输出的文件数量,节省了磁盘的I/O,最终提升了性能。
6.7.2 在map端简单缓存,排序分组,在reduce端合并组合
此种情况在缓存中利用指定的排序函数对数据按照partition或者key进行排序,最后按partition顺序合并写入同一文件。当没有指定聚合函数,且partition数量大时,一般采用这种方式,此种方式将多个bucket合并到同一个文件,通过减少map输出的文件数量,节省了磁盘I/O,提升了性能;对SizeTrackingPairBuffer的缓存进行溢出判断,当超出myMemoryThreshold的大小时,将数据写入磁盘,防止内存溢出。
6.7.3在map端缓存中聚合、排序分组、在reduce端组合
此种情况在缓存中对数据按照key聚合,并且利用指定的排序函数对数据按照partition或者key进行排序,最后按partition顺序合并写入同一文件。当指定了聚合函数时,一般采用这种方式,见下图,此种方式将多个bucket合并到同一文件,通过减少map输出的文件数量,节省了磁盘I/O,提升了性能;对中间输出数据不是一次性读取,而是逐条放入AppendOnlyMap的缓存对数据进行聚合,减少了中间结果占用的内存大小;对AppendOnlyMap的缓存进行溢出判断,当超出myMemoryThreshold的大小时候,将数据写入磁盘,防止内存溢出。
内核分配策略:
1、构造ProcessBuilder,进程主类是CoarseGainedExecutorBackend,有关buildProcessBuilder的实现
2、为ProcessBuilder设置执行目录、环境变量
3、启动ProcessBuilder,生成线程。
4、重定向进程的文件输出流与错误流为executorDir目录下的stdout与stderr。
5、等待获取进程的退出状态,一旦收到退出状态,则向Worker发送ExecutorStateChanged消息
7.5.1YARN
mrv1的运行时环境由jobtracker和TaskTracker两类服务组成,JobTracker负责资源和任务的管理与调度,TaskTracker负责单个节点的资源管理与任务执行。在YARN中,JobTracker被分为两部分:ResourceManager(RM)和ApplicationMaster(AM)RM负责资源管理和调度,AM负责具体应用程序的任务划分、调度等工作
ResourceManager(RM):全局资源管理器,负责整个系统的资源管理与分配。RM由调度器和应用程序管理器组成。调度器将系统资源分配给各个应用程序,而是Container是对CPU,内存等资源的封装。应用程序管理器负责整个系统的应用程序,如何处理程序提交,与调度器沟通后为应用程序启动applicationMaster
ApplicationMaster(AM):用户提交的每个任务都有一个AM,它会与RM通信获取资源,将任务划分为更加细粒度的任务,与NodeManager(NM)通信启动或停止任务,
spark提交给yarn任务顺序:
1)将spark提供的Application在Yarn集群中启动
2)ApplicationMaster向ResourceManager申请containner。
3)申请COntainner成功后,向具体的NodeManager发送指令启动Container
4)ApplicationMaster启动对各个运行的ContainerExecutor进行监控。
ApplicationMaster给Application分配资源主要借助YarnAllocationHandler
网友评论