MapReduce(六):Reduce阶段

作者: b91cbec6a902 | 来源:发表于2019-06-19 17:31 被阅读5次

    概述

    基于Hadoop 2.x

    Reduce阶段什么时候开始?

    默认情况下,当Map任务完成个数达到Map任务总数的5%时,MRAppMaster开始向Yarn为Reduce申请Container,并启动Reduce进程。

    此时Reduce启动后拉取每个Map的结果集中属于自己的那个分区的数据。注意这时候Reduce虽然启动了,但执行的操作只是拉取分区数据,也就是说必须所有的Map任务全部完成,所有的分区数据都已经拉取,Reduce阶段才真正开始处理数据。

    这个阈值偏低,容易造成Reduce阶段早早的启动,占用了有限的资源,却没干多少活。可通过以下参数调整此阈值:

    <property>
        <name>mapreduce.job.reduce.slowstart.completedmaps</name>
        <value>0.8</value>
    </property>
    
    Reduce阶段从哪里拉取Map阶段产生的数据?

    通过HTTP请求从Shuffle HTTP Server拉取数据。
    Shuffle HTTP Server是由NodeManager启动的。使用的是Yarn的AuxServices机制,NodeManager允许用户通过配置附属服务的方式扩展自己的功能,这使得每个节点可以定制一些特定框架需要的服务。附属服务需要在NodeManagger启动前配置好,并由NodeManagger统一启动和关闭。
    Map阶段将结果数据提交到Shuffle HTTP Server后Map进程就退出了,后续Reduce进程是从Shuffle HTTP Server拉取数据的。

    可以从yarn-site.xml找到如下配置:

    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    
    Reduce阶段的流程是怎么样的?

    Reduce阶段启动的进程也是YarnChild -> org.apache.hadoop.mapred.YarnChild#main。整个Reduce阶段可以分为三个阶段:①Map结果数据拉取;②合并排序;③执行Reduce处理逻辑。

    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    

    1、在MRAppMaster进程中通过ContainerLauncher向NodeManager发送Container启动命令,启动YarnChild进程org.apache.hadoop.mapred.YarnChild#main,通过启动命令传入了以下几个参数:
    ①MRAppMaster进程中的TaskAttemptListener组件提供的TaskUmbilicalProtocol服务的host和port。
    ②当前任务的TaskAttemptID。
    ③当前任务的JVMId。
    注意:TaskAttemptID很重要,它是Reduce任务的唯一标识,也是Reduce阶段拉取Map结果数据的凭证,它决定了拉取Map结果数据的哪个分区。

    class YarnChild {
    
        public static void main(String[] args) throws Throwable {
    
            String host = args[0];
    
            int port = Integer.parseInt(args[1]);
    
            final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
    
            long jvmIdLong = Long.parseLong(args[3]);
    
            .....
        }
    }
    

    2、初始阶段跟Map阶段一样,通过RPC协议TaskUmbilicalProtocol与MRAppMaster建立通讯。

    final InetSocketAddress address =
            NetUtils.createSocketAddrForHost(host, port);
    
    final TaskUmbilicalProtocol umbilical =
      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
      @Override
      public TaskUmbilicalProtocol run() throws Exception {
        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
            TaskUmbilicalProtocol.versionID, address, job);
      }
    });
    

    3、通过RPC协议TaskUmbilicalProtocol获取要处理的任务,Reduce阶段获取的任务实例为org.apache.hadoop.mapred.ReduceTask,然后开始执行任务。

    myTask = umbilical.getTask(context);
    
    taskFinal.run(job, umbilical); // run the task
    

    4、初始化Shuffle组件,开始后半段Shuffle(整个Shuffle的前半段在发生在Map阶段)。

    # org.apache.hadoop.mapred.ReduceTask#run
    
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    // 加载Shuffle组件
    Class<? extends ShuffleConsumerPlugin> clazz =
              job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    // 创建Shuffle上下文信息
    ShuffleConsumerPlugin.Context shuffleContext = 
          new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                      super.lDirAlloc, reporter, codec, 
                      combinerClass, combineCollector, 
                      spilledRecordsCounter, reduceCombineInputCounter,
                      shuffledMapsCounter,
                      reduceShuffleBytes, failedShuffleCounter,
                      mergedMapOutputsCounter,
                      taskStatus, copyPhase, sortPhase, this,
                      mapOutputFile, localMapFiles);
    shuffleConsumerPlugin.init(shuffleContext);
    // 运行Shuffle
    rIter = shuffleConsumerPlugin.run();
    

    ①启动EventFetcher线程,不断从MRAppMaster获取Map任务完成的事件。以此来感知哪些Map任务完成了,也就是哪些Map阶段结果数据可以拉取了。

    // Start the map-completion events fetcher thread
    final EventFetcher<K,V> eventFetcher = 
          new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
              maxEventsToFetch);
    eventFetcher.start();
    
    protected int getMapCompletionEvents()
        throws IOException, InterruptedException {
      
      int numNewMaps = 0;
      TaskCompletionEvent events[] = null;
    
      do {
        // 通过RPC请求MRAppMaster获取Map完成事件。
        MapTaskCompletionEventsUpdate update =
            umbilical.getMapCompletionEvents(
                (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
                fromEventIdx,
                maxEventsToFetch,
                (org.apache.hadoop.mapred.TaskAttemptID)reduce);
        events = update.getMapTaskCompletionEvents();
        LOG.debug("Got " + events.length + " map completion events from " +
                 fromEventIdx);
    
        assert !update.shouldReset() : "Unexpected legacy state";
    
        fromEventIdx += events.length;
        // 将获取到的TaskCompletionEvent事件交给ShuffleScheduler来处理
        for (TaskCompletionEvent event : events) {
          scheduler.resolve(event);
          if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {
            ++numNewMaps;
          }
        }
      } while (events.length == maxEventsToFetch);
    
      return numNewMaps;
    }
    

    将获取到的Map任务完成事件交给ShuffleScheduler处理,ShuffleScheduler对应的实例为ShuffleSchedulerImpl。状态为SUCCESSED的Map任务被封装成MapHost对象,放入Set<MapHost> pendingHosts 中,以备后续处理。

    public void resolve(TaskCompletionEvent event) {
      switch (event.getTaskStatus()) {
      // 事件状态是成功的,代表Map任务顺利完成,将其封装成MapHost对象放入Set<MapHost> pendingHosts = new HashSet<MapHost>()中,以备后续处理。
      case SUCCEEDED:
        URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
        addKnownMapOutput(u.getHost() + ":" + u.getPort(),
            u.toString(),
            event.getTaskAttemptId());
        maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
        break;
      case FAILED:
      case KILLED:
      case OBSOLETE:
        obsoleteMapOutput(event.getTaskAttemptId());
        LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
            " map-task: '" + event.getTaskAttemptId() + "'");
        break;
      case TIPFAILED:
        tipFailed(event.getTaskAttemptId().getTaskID());
        LOG.info("Ignoring output of failed map TIP: '" +
            event.getTaskAttemptId() + "'");
        break;
      }
    }
    

    ②启动一批Fetcher线程(可配置,默认5个),从ShuffleScheduler获取Map数据的URL,并发拉取Map结果数据,拉取的方式是HTTP请求。

    final int numFetchers = isLocal ? 1 :
          jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
    if (isLocal) {
      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
          localMapFiles);
      fetchers[0].start();
    } else {
      for (int i=0; i < numFetchers; ++i) {
        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                       reporter, metrics, this, 
                                       reduceTask.getShuffleSecret());
        fetchers[i].start();
      }
    }
    
    public void run() {
      try {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          MapHost host = null;
          try {
            // If merge is on, block
            merger.waitForResource();
    
            // Get a host to shuffle from
            host = scheduler.getHost();
            metrics.threadBusy();
    
            // Shuffle
            copyFromHost(host);
          } finally {
            if (host != null) {
              scheduler.freeHost(host);
              metrics.threadFree();            
            }
          }
        }
      } catch (InterruptedException ie) {
        return;
      } catch (Throwable t) {
        exceptionReporter.reportException(t);
      }
    }
    

    将拉取过来的数据封装成MapOutput对象,MapOutput有两个实现,分别为InMemoryMapOutput & OnDiskMapOutput,从名字可以看出来,拉取到的数据可以放到内存里,也可以放到磁盘里。mapreduce.reduce.shuffle.memory.limit.percent这个参数控制了,有多少内存可以用来存放Map结果数据,默认0.25,即25%堆内存。超过25%以后,拉取的Map结果数据一律放在磁盘上。

    public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
                                                 long requestedSize,
                                                 int fetcher
                                                 ) throws IOException {
      if (requestedSize > maxSingleShuffleLimit) {
        LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
                 " is greater than maxSingleShuffleLimit (" + 
                 maxSingleShuffleLimit + ")");
        return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
           fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
           mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
      }
      
      if (usedMemory > memoryLimit) {
        LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
            + ") is greater than memoryLimit (" + memoryLimit + ")." + 
            " CommitMemory is (" + commitMemory + ")"); 
        return null;
      }
      
      // Allow the in-memory shuffle to progress
      LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
          + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
          + "CommitMemory is (" + commitMemory + ")"); 
      return unconditionalReserve(mapId, requestedSize, true);
    }
    

    ③在拉取Map结果数据的同时,会有两个线程同时执行合并操作,InMemoryMerger线程负责合并内存中的Map结果数据,OnDiskMerger线程负责合并磁盘上的Map结果数据。生成的MapOutput会交给InMemoryMergerOnDiskMerger线程进行合并处理。

    ④当所有的Map结果数据拉取完毕、内存中的Map结果数据全部合并排序完毕、磁盘中的Map结果数据全部合并排序完毕时会进行最终合并,即将内存和磁盘中的Map结果数据合并。

    // stop the scheduler
    scheduler.close();
    
    copyPhase.complete(); 
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    reduceTask.statusUpdate(umbilical);
    
    // Finish the on-going merges...
    RawKeyValueIterator kvIter = null;
    try {
      kvIter = merger.close();
    } catch (Throwable e) {
      throw new ShuffleError("Error while doing final merge " , e);
    }
    
    public RawKeyValueIterator close() throws Throwable {
        // Wait for on-going merges to complete
        if (memToMemMerger != null) { 
          memToMemMerger.close();
        }
        // 等待内存中的Map结果数据全部合并完毕
        inMemoryMerger.close();
        // 等待磁盘中的Map结果数据全部合并完毕
        onDiskMerger.close();
        
        List<InMemoryMapOutput<K, V>> memory = 
          new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
        inMemoryMergedMapOutputs.clear();
        memory.addAll(inMemoryMapOutputs);
        inMemoryMapOutputs.clear();
        List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
        onDiskMapOutputs.clear();
         // 内存和磁盘中的Map结果数据合并
        return finalMerge(jobConf, rfs, memory, disk);
    }
    

    如何进行排序的呢?主要依靠org.apache.hadoop.mapred.Merger.MergeQueue类MergeQueue继承了优先队列,实现了迭代器接口。MergeQueue中的元素是以Segment为单位,Segment是内部已经排序完成的数据段。Segment中的第一个元素的大小代表这个Segment的大小,以此来进行优先队列的调整。当通过迭代方法从MergeQueue获取元素是,必定是当前最小(最大)的元素。这样就实现了排序的效果。

     private static class MergeQueue<K extends Object, V extends Object> 
      extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator
    

    至此,Shuffle后半段结束。
    5、开始执行Reduce逻辑。Reduce结果直接输出到磁盘

    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
    

    6、完成以上操作后,Reduce进程向MRAppMaster进程汇报任务结束,然后退出进程。

    相关文章

      网友评论

        本文标题:MapReduce(六):Reduce阶段

        本文链接:https://www.haomeiwen.com/subject/fuouyqtx.html