MapReduce(五):Map阶段

作者: b91cbec6a902 | 来源:发表于2019-04-29 11:49 被阅读43次

    概述

    基于Hadoop 2.x

    Map阶段启动的大致流程是怎样的?

    1、在MRAppMaster进程中通过ContainerLauncher向NodeManager发送Container启动命令,启动YarnChild进程org.apache.hadoop.mapred.YarnChild#main,通过启动命令传入了以下几个参数:
    ①MRAppMaster进程中的TaskAttemptListener组件提供的TaskUmbilicalProtocol服务的host和port。
    ②当前任务的TaskAttemptID。
    ③当前任务的JVMId。

    class YarnChild {
    
        public static void main(String[] args) throws Throwable {
    
            String host = args[0];
    
            int port = Integer.parseInt(args[1]);
    
            final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
    
            long jvmIdLong = Long.parseLong(args[3]);
    
            .....
        }
    
    }
    

    2、通过RPC协议TaskUmbilicalProtocol与MRAppMaster建立通讯。

    final InetSocketAddress address =
            NetUtils.createSocketAddrForHost(host, port);
    
    final TaskUmbilicalProtocol umbilical =
      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
      @Override
      public TaskUmbilicalProtocol run() throws Exception {
        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
            TaskUmbilicalProtocol.versionID, address, job);
      }
    });
    
    

    3、通过RPC协议TaskUmbilicalProtocol获取要处理的任务,Map阶段获取的任务实例为org.apache.hadoop.mapred.MapTask,然后开始执行任务。

    myTask = umbilical.getTask(context);
    
    taskFinal.run(job, umbilical); // run the task
    
    

    4、在MapTask任务真正执行之前,先准备运行的环境。
    ①初始化TaskReporter,随时向MRAppMaster进程汇报Map任务的进度。

    TaskReporter reporter = startReporter(umbilical);
    

    ②初始化Job上下文信息、Task上下文信息、Map阶段结果提交器等。

    initialize(job, getJobID(), reporter, useNewApi);
    
    // 初始化Job上下文信息
    jobContext = new JobContextImpl(job, id, reporter);
    
    // 初始化Task上下文信息
    taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
    
    // 初始化Map阶段结果提交器
    outputFormat =
        ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
    committer = outputFormat.getOutputCommitter(taskContext);
    

    5、初始化数据源、数据输入器、数据处理器、数据输出器。

    runNewMapper(job, splitMetaInfo, umbilical, reporter);
    
    // 初始化数据源,也就是数据分片
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    
    // 初始化数据输入器,通过配置的InputFormat获取
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    // 初始化数据处理器,也就是我们自定义的Mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    
    // 初始化数据输出器,当Mapper处理完数据后将数据交给output输出
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }
    
    

    6、开始数据处理。到这里就很熟悉了,这里就是我们自己编写的Mapper,循环处理所有数据。

    mapper.run(mapperContext);
    
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
        } finally {
          cleanup(context);
        }
    }
    

    7、Map处理结果输出
    在我们自定义的Mapper都会有这样一行代码context.write(key, value),进行处理结果的收集和输出操作。

    数据输出器的具体实现类为:org.apache.hadoop.mapred.MapTask.NewOutputCollector

    private class NewOutputCollector<K,V>
        extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
        private final MapOutputCollector<K,V> collector;
        private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
        private final int partitions;
    
        @SuppressWarnings("unchecked")
        NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                           JobConf job,
                           TaskUmbilicalProtocol umbilical,
                           TaskReporter reporter
                           ) throws IOException, ClassNotFoundException {
          collector = createSortingCollector(job, reporter);
          // 数据分区数为Reduce的数量
          partitions = jobContext.getNumReduceTasks();
          if (partitions > 1) {
            partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
              ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
          } else {
            partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
              @Override
              public int getPartition(K key, V value, int numPartitions) {
                return partitions - 1;
              }
            };
          }
        }
      ...
    }
    

    从上面的NewOutputCollector的构造方法可以看出,数据分区数为Reduce的数量,假设有n个Reduce,那么每个Map产生的结果集数据要分成n份,每个Reduce对应一份。

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }
    

    从上面的write(K key, V value)方法可以看出,每条数据都会计算它归属的分区。然后写入MapOutputCollector<K,V> collector 中。
    具体实现为:org.apache.hadoop.mapred.MapTask.MapOutputBuffer
    MapOutputBuffer内部是一个数组,在逻辑上是一个环形数组,这个就是MapReduce的环形缓冲区。

    每条数据分为两部分写入MapOutputBuffer:
    ①真实的数据(Key-Value值)

    // 写入key值
    keySerializer.serialize(key);
    // 写入value值
    valSerializer.serialize(value);
    

    ②数据的元信息:属于哪个分区、key值在buffer中的起始位置、value值在buffer中的起始位置、value值的长度。这部分元信息起到索引的作用。

    // 写入分区信息
    kvmeta.put(kvindex + PARTITION, partition);
    // 写入key值在buffer中的起始位置
    kvmeta.put(kvindex + KEYSTART, keystart);
    // 写入value值在buffer中的起始位置
    kvmeta.put(kvindex + VALSTART, valstart);
    // 写入value值的长度
    kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
    
    数据的存储 逻辑上的数据存储

    环形缓存区的大小是有限的,当环形缓存区使用量达到阈值(默认80%)时,会将环形缓存区中的数据溢写到磁盘里(不是HDFS)。

    org.apache.hadoop.mapred.MapTask.MapOutputBuffer#sortAndSpill
    

    每次溢写都会生成类似的文件:sipll0.out ,文件里的数据是按照partition划分好的,每个partition中的数据都是已排序的。


    溢写文件

    当Map阶段数据处理全部完成,因为还有数据保存在环形缓冲区中,所有在结束的时候必定会产生一次溢写,将数据写入磁盘。

    output.close(mapperContext);
    
    @Override
    public void close(TaskAttemptContext context
                      ) throws IOException,InterruptedException {
      try {
        collector.flush();
      } catch (ClassNotFoundException cnf) {
        throw new IOException("can't find class ", cnf);
      }
      collector.close();
    }
    

    由于最终的溢写文件>=1个,所以最后还需要把所有的溢写文件合并成一个。

    // org.apache.hadoop.mapred.MapTask.MapOutputBuffer#flush
    mergeParts();
    
    merge

    最终Map阶段会生成一个按分区划分,分区内已排序的数据文件,然后将数据文件交给Shuffle HTTP Server。

    8、完成以上操作后,Map进程向MRAppMaster进程汇报任务结束,然后退出进程。

    Map阶段生成的数据文件交给了谁管理?

    交给了Shuffle HTTP Server服务来管理。这个服务是由NodeManager启动的。使用的是Yarn的AuxServices机制,NodeManager允许用户通过配置附属服务的方式扩展自己的功能,这使得每个节点可以定制一些特定框架需要的服务。附属服务需要在NM启动前配置好,并由NM统一启动和关闭。典型的应用是MapReduce框架中用到的Shuffle HTTP Server,其通过封装成一个附属服务由各个NodeManager启动。后续Reduce阶段也是从Shuffle HTTP Server来拉取数据的。

    相关文章

      网友评论

        本文标题:MapReduce(五):Map阶段

        本文链接:https://www.haomeiwen.com/subject/xsouyqtx.html