美文网首页大数据知识杂谈
MapReduce执行过程及shuffle详解

MapReduce执行过程及shuffle详解

作者: 稻草人_d41b | 来源:发表于2018-10-24 09:55 被阅读0次

            开始学习Hadoop时,经常会听到MapReduce。MapReduce由Map和Reduce两个阶段,每个阶段都是以键-值对作为输入和输出,Map阶段是将数据进行映射处理,形成键值对。reduce是对map处理后的键值对进行聚合等相关运算。比如经典的wordcount:map阶段是将文本内容拆分成(字段串,1)键值对,(hello,1)(world,1)(hello,1),reduce是将相同key的value值合并,形成(hello,2),(world,1)。但在整个处理中有数据排序、落盘、合并、shuffle等过程,希望通过下面内容可以帮助你深入理解这个过程。

         MapReduce处理流程回顾

            MapReduce是客户端需要执行的一个工作单元,包括输入数据、MapReduce程序和配置信息。在运行时Hadoop会将作业分成若干个任务(task)执行,任务🈶️map任务和reduce任务构成。这些任务在集群的节点(服务器)通过YARN进行调度,如果一个任务失败另一个不同节点会自动重新调用运行。

         Map过程

            Hadoop运行时对数据进行了分片,每个分片构建一个map任务(map数量具体确定细节查看https://www.jianshu.com/p/a691980f18d1)。通过对数据进行分片,每个分片的处理时间小于整个数据花费时间,如果这些map任务并行执行处理,并且每一个分片数据比较小,执行时间短,则可以通过mapreduce加速数据处理时间。但也不是每个分区数据越小越好,如果数据切分太小,管理分片总时间和构建map任务总时间远大于整体执行过程就得不偿失了。通常一个作业来说一个合理但分片大小默认是128MB,但也可以根据具体情况调整。

            map任务运行在由YARN进行集群统一资源管理调度的节点上,任务执行的最佳性能是数据在map任务运行的节点上,即“数据本地化优化”。但如果数据及其副本所在节点没有资源运行map任务,此时调度系统会尽量在数据所在集群寻找一个空闲的map槽(slot)运行map任务分片,数据和执行任务运行在不同的节点上,需要通过网络进行数据传输,严重影响运行时间。

            map任务运行完成后将输出结果写入本地磁盘,之所以不是HDFS,因为map的输出是中间结果,该结果通过reduce任务处理后才产生最终输出结果,一旦作业完成,map的输出结果就可以删除。数据如果存储在HDFS,一方面数据写性能差另外没有必要通过默认的3副本高可用存储数据,如果map任务节点宕机,可以通过在其他节点重新运行map任务构建中间结果。  

         reduce过程

            reduce任务并不具备数据本地化优势,单个reduce任务的输入通常来自多个map的输出,如果仅有一个reduce任务,其输入是所有map任务的输出。排序后的map输出通过网络传输发送到reduce任务节点,数据在reduce端合并处理。reduce的输出通常存储在HDFS中实现可靠存储。reduce任务不是由输入数据大小决定(reduce数量确定参考https://www.jianshu.com/p/a691980f18d1)。如果有多个reduce任务,每个map任务会针对输出键值对进行分区(partition),为每个reduce任务创建一个分区,每个分区有许多键,但每个键值对的键值对记录在同一个分区中,分区函数默认通过哈希函数决定。

    图-一个reduce任务的MapReduce数据流 图-多个reduce任务数据流

    多个reduce任务数据流map任务和reduce任务之间的数据流称为shuffle(混洗),每个reduce的输入都来自许多map任务,shuffle一般比图中所示更复杂,而且调整混洗参数对作业总执行时间影响非常大。

    图-无reduce任务的MapReduce数据流

            集群可用带宽限制MapReduce作业数量,因此应该尽量避免map和reduce任务之间的数据传输,hadoop允许用户针对map任务输出执行一个combiner,进行部分数据map端合并减少数据传输数量,combiner属于优化方案,不管调用combiner多少次,reduce的输出都一样。

         MapReduce作业运行机制

    mapreduce运行过程如下,由一下5个独立的实体:

    1、客户端,提交mapreduce作业

    2、YARN资源管理器,负责协调集群上计算机资源分配

    3、YARN节点管理器,负责启动和监视集群中机器上的计算容器(container)

    4、MapReduce的application master,负责协调运行MapReduce作业的任务,和MapReduce任务在容器中运行,这些容器由资源管理器分配,由节点管理器进行管理

    5、分布式文件系统(HDFS),用来与其他实体间共享作业文件。

    图-mapreduce作业工作原理

            这个执行过程包括下面几个阶段:

         作业提交

            job的submit() 方法创建一个内部JobSummiter实例,并调用submitJobInternal()方法,提交后waitForCompletion()每秒轮询作业解读,如果有变动,把进度报告到控制台。JobSummiter作业提交过程如下:

            1、向资源管理器申请一个新应用ID,用于MapReduce作业ID。如上图步骤2。

            2、检查作业的输出说明。如果没指定输出目录或输出目录已存在,作业不提交,错误抛给MapReduce程序。

            3、计算作业的输入分片,分片无法计算,错误返回给MapReduce程序。

            4、将运行作业所需资源(作业jar文件、配置文件和计算所得输入分片)复制到一个以作业ID命名的目录下共享文件系统(步骤3),作业JAR复本较多,在运行作业任务时集群中有很多复本可供节点管理器访问。

            5、通过调用资源管理器submitApplication()方法提交作业,如上图步骤4。

         作业初始化

            资源调度器收到调用它的submitApplication()消息后,将请求传递给YARN调度器(scheduler),调度器分配一个容器,然后资源管理器在节点管理器管理下在容器中启动application master进程(步骤5a和5b)。

            application master是一个java应用程序,主类是MRAppMaster,将接受来自任务进度和完成报告(步骤6),application master对作业的初始化通过创建多个薄记对象保持对作业进度跟踪。然后接受来自共享文件系统的、在客户端计算的输入分配,对每个分片创建一个map任务对象以及由mapreduce.job.reduces属性确定多个reduce对象。

         任务分配

            application master 为作业中所有map任务和reduce任务向资源管理器请求容器(步骤8),首先Map任务发出请求,该请求高于reduce任务请求,因为reduce必须在所有map任务完成后才能启动。

            reduce任务可以运行在集群任意位置,但map任务请求有着数据本地化局限,在理想情况下任务是数据本地化,任务可以和分片在一个节点运行。如果节点资源无法启动map任务,map任务可能是机架本地化(rack local),即数据和任务运行在同一机架而非同一节点。一些map任务既不是数据本地化,也不是机器本地化,从别的机架获取运行数据。请求会为任务分配内存需求和cpu数,默认情况下,每个map任务和reduce任务分配1024MB内存和虚拟内核,这些值可通过mapreduce.map.memory.mb(map任务需要内存)、mapreduce.reduce.memory.mb(reduce任务需要内存)、mapreduce.map.cpu.vcores(map任务需要内核数)、mapreduce.reduce.cpu.vcores

          任务执行

            一旦资源管理器的调度器为任务分配特定节点上容器,application master通过与节点管理器通信启动容器(步骤9a、9b),任务由YarnChild执行。在运行任务之前,需要将作业配置、JAR文件等资源本地化。YarnChild在指定JVM中运行,故map或reduce函数任何缺陷都不会影响节点管理器。

          进度和状态更新

            MapReduce作业是长时间运行批量作业,运行范围从数秒到数小时,如何得知作业进展就很重要。作业和任务关注的进展有:作业或任务状态(运行中、成功完成、失败)、map和reduce进度、作业计数器值、状态信息。当map和reduce任务运行时,子进程和application master 通过umbilical接口通信,每隔3秒,umbilical接口向application master 报告进度和状态,会形成作业汇聚试图。

            客户端可以使用Job的getStatus()得到JobStatus实例,包含作业所有状态信息。流程如下:

    图-mapreduce作业状态更新传递流程

          作业完成

            application master收到作业最后一个任务已完成通知,把作业状态设置为“成功”,在Job轮询状态时,知道任务已完成,从waitForCompletion()方法返回。

         shuffle和排序

            MapReduce确保每个reduce的输入都是按键排序的,系统执行排序、将map输出作为输入传给reduce过程叫shuffle。

         map端

            map函数利用缓冲方式将结果写到内存并出于效率考虑进行预排序,过程如下:  

    图-MapReduce的shuffle和排序

            每个map任务都有一个环形内存缓冲区存储任务输出,默认情况下,缓冲区大小是100MB,一旦缓冲区内容达到阈值,一个后台线程把内容溢出(spill)到磁盘。如果map输出写入缓冲区被填满,map会被阻塞直到写磁盘过程完成。溢出写过程将缓冲区内容写到mapreduce.cluster.local.dir属性在作业特定子目录指定下目录。

            在写磁盘前,线程根据数据最终要传的reduce,把数据划分成相应的分区(partition),在每个分区中后台数据进行内存排序,如果有combiner函数,在排序后运行combiner函数使map输出结果更紧簇,减少写到磁盘数据和传递给reduce数据。

            每次达到溢出阈值,会新建一个溢出文件(spill file),在map任务写完最后一个输出记录后,会有多个溢出文件,在任务完成之前,溢出文件被合并成一个已分区且已排序输出文件。

            mapreduce.map.combine.minspills属性设置combiner再次运行时溢出文件的要求,默认至少3个。

            在将map输出到本地节点磁盘时可以对数据进行压缩,节约磁盘空间,减少传递给reduce数据量,可以通过mapreduce.map.output.compress设置,mapreduce.map.output.compress.codec指定压缩库。reduce通过HTTP得到输出文件分区。

          reduce端

            map输出文件位于本地磁盘,所有map任务完成时,会通知给application master,reduce中一个线程定期询问master获取map输出主机位置,直到获取所有输出位置, reduce任务开始复制map输出。复制完成后reduce任务进入排序阶段,该阶段将合并map输出,维持其顺序排序。在reduce阶段,直接把数据输入给reduce函数,省略一次磁盘往返行程。

    图-reduce端文件合并

          shuffle配置调优

            通过上面shuffle过程解读,可知shuffle过程在mapreduce整个过程中起着关键端作用,优化好shuffle过程可以提升mapreduce性能。下面从map和reduce端梳理shuffle优化方法。

          map端shuffle调优

            mapreduce.task.io.sort.mb    排序map输出时所使用的内存缓冲区大小,默认是100MB。

            mapreduce.map.sort.spill.percent  map输出内存缓冲区和开始溢出写过程阈值

            mapreduce.task.io.sort.factor   排序文件时一次最多合并流数

            mapreduce.map.combine.minspills  运行combiner所需最少溢出文件数

            mapreduce.map.output.compress    是否压缩map输出

            mapreduce.map.output.compress.codec   指定map输出压缩解码器

            mapreduce.shuffle.max.threads    用于将map输出到reduce的工作线程数,表示的是整个集群范围的设置。

          reduce端shuffle调优

            mapreduce.reduce.shuffle.parallelcopies 用于把map输出复制到reduce线程数

            mapreduce.reduce.shuffle.maxfetchfailures reduce获取一个map输出所花费最大时间

            mapreduce.task.io.sort.factor 排序文件时一次最多合并流数 

            mapreduce.reduce.shuffle.input.buffer.percent     在shuffle复制阶段,分配给map输出的缓冲区占堆空间百分比。

    相关文章

      网友评论

        本文标题:MapReduce执行过程及shuffle详解

        本文链接:https://www.haomeiwen.com/subject/vrpazftx.html