美文网首页
1.2.2.2Map-Task(源码解读)

1.2.2.2Map-Task(源码解读)

作者: 寒暄_HX | 来源:发表于2020-03-20 21:29 被阅读0次

    总目录:https://www.jianshu.com/p/e406a9bc93a9

    Hadoop - 子目录:https://www.jianshu.com/p/9428e443b7fd

    Map-Task的源码分为两部分,一部分是我们对map的输入(包含实例化),另一部分是因为有了输入,所以也造成了输出。

    我们先看第一部分,我们对map的输入:

    map的实例化

    第一层

    之后我们会进入到org.apache.hadoop.mapreduce.Mapper类中,在这里我们直接看他的run方法:

    org.apache.hadoop.mapreduce.Mapper类
    
      public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
        } finally {
          cleanup(context);
        }
      }
    }
    

    之后我们要打开org.apache.hadoop.mapred.MapTask类,在这个类中找到run方法的同名方法:

    第一层

    org.apache.hadoop.mapred.MapTask类
    这个方法中,涵盖了所有我们造成的输入。
    
    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
        throws IOException, ClassNotFoundException, InterruptedException {
        this.umbilical = umbilical;
    
        if (isMapTask()) {   
          // If there are no reducers then there won't be any sort. Hence the map 
          // phase will govern the entire attempt's progress.
          if (conf.getNumReduceTasks() == 0) {
            mapPhase = getProgress().addPhase("map", 1.0f);
          } else {
            // If there are reducers then the entire attempt's progress will be 
            // split between the map phase (67%) and the sort phase (33%).
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
          }
        }
        TaskReporter reporter = startReporter(umbilical);
     
        boolean useNewApi = job.getUseNewMapper();
        initialize(job, getJobID(), reporter, useNewApi);
    
        // check if it is a cleanupJobTask
        if (jobCleanup) {
          runJobCleanupTask(umbilical, reporter);
          return;
        }
        if (jobSetup) {
          runJobSetupTask(umbilical, reporter);
          return;
        }
        if (taskCleanup) {
          runTaskCleanupTask(umbilical, reporter);
          return;
        }
    
        if (useNewApi) {
          runNewMapper(job, splitMetaInfo, umbilical, reporter);
        } else {
          runOldMapper(job, splitMetaInfo, umbilical, reporter);
        }
        done(umbilical, reporter);
      }
    

    这个方法中有两个重要的代码段,第一段是:

        if (isMapTask()) {   // 判断是否有map任务
          if (conf.getNumReduceTasks() == 0) {   //判断是否有reduce任务
            mapPhase = getProgress().addPhase("map", 1.0f); // 如果没有的话 map阶段占据整个任务的100%
          } else {
            // 如果有的话,map阶段占据67%,sort阶段占据33%
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
          }
        }
    

    这一段代码把map阶段要进行的任务划分完成。

    第二段是:

        if (useNewApi) {
          // 喜闻乐见的选择执行新版map,还是旧版map。
          runNewMapper(job, splitMetaInfo, umbilical, reporter);
        } else {
          runOldMapper(job, splitMetaInfo, umbilical, reporter);
        }
    

    根据新版map方法进行追踪。

    第二层

     void runNewMapper(final JobConf job,
                        final TaskSplitIndex splitIndex,
                        final TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException,
                                 InterruptedException {
        // make a task context so we can get the classes
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
          new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                      getTaskID(),
                                                                      reporter);
        // make a mapper
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
        // make the input format
        org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
          (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
            ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
        // rebuild the input split
        org.apache.hadoop.mapreduce.InputSplit split = null;
        split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
            splitIndex.getStartOffset());
        LOG.info("Processing split: " + split);
    
        org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
          new NewTrackingRecordReader<INKEY,INVALUE>
            (split, inputFormat, reporter, taskContext);
        
        job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
        org.apache.hadoop.mapreduce.RecordWriter output = null;
        
        // get an output object
        if (job.getNumReduceTasks() == 0) {
          output = 
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
        } else {
          output = new NewOutputCollector(taskContext, job, umbilical, reporter);
        }
    
        org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
        mapContext = 
          new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
              input, output, 
              committer, 
              reporter, split);
    
        org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
            mapperContext = 
              new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
                  mapContext);
    
        try {
          input.initialize(split, mapperContext);
          mapper.run(mapperContext);
          mapPhase.complete();
          setPhase(TaskStatus.Phase.SORT);
          statusUpdate(umbilical);
          input.close();
          input = null;
          output.close(mapperContext);
          output = null;
        } finally {
          closeQuietly(input);
          closeQuietly(output, mapperContext);
        }
      }
    

    这个阶段同样分为两部分,第一部分是准备细节,第二部分是干活的部分。

    第一部分:
    第一部分分为五个阶段
    第一阶段:

    org.apache.hadoop.mapred.MapTask类
    
    // make a task context so we can get the classes
    // 创建任务上下文以便我们可以获取类
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
          new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,  //我们自定义的job
                                                                      getTaskID(),
                                                                      reporter); //创建上下文
    

    第二阶段:

    org.apache.hadoop.mapred.MapTask类
    
    // make a mapper
    // 创造mapper
       org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            ReflectionUtils.newInstance(taskContext.getMapperClass(), job);  // 把自定义的map类反射出来
    

    第三阶段:

    org.apache.hadoop.mapred.MapTask类
    
    // make the input format
    // 制作输入格式
        org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
          (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
            ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);  //把自定的InputFormat类反射出来  后续解析
    

    第四阶段:

    org.apache.hadoop.mapred.MapTask类
    
    // rebuild the input split\
    // 制造输入切片
        org.apache.hadoop.mapreduce.InputSplit split = null;
        split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
            splitIndex.getStartOffset());//每一个切片条目对应的是一个MapTask 每个切片中对应的4个东西(文件归属,偏移量,长度,位置信息)
        LOG.info("Processing split: " + split);
    
        org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
          new NewTrackingRecordReader<INKEY,INVALUE> //后续解析
            (split, inputFormat, reporter, taskContext);//上面准备的输入格式化和切片为输入准备,拿到流,怎么读按文本方式读,行级
        
        job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
        org.apache.hadoop.mapreduce.RecordWriter output = null;
    

    在 我们可以看到在我标注后续解析那里,maptask是new了一个对象出来,而NewTrackingRecordReader类才是实际干活的。

    org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader类
    
    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
            org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
            TaskReporter reporter,
            org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
            throws InterruptedException, IOException {
          this.reporter = reporter;
          this.inputRecordCounter = reporter
              .getCounter(TaskCounter.MAP_INPUT_RECORDS);
          this.fileInputByteCounter = reporter
              .getCounter(FileInputFormatCounter.BYTES_READ);
    
          List <Statistics> matchedStats = null;
          if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
            matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
                .getPath(), taskContext.getConfiguration());
          }
          fsStats = matchedStats;
    
          long bytesInPrev = getInputBytes(fsStats);
          this.real = inputFormat.createRecordReader(split, taskContext); //调用了InputFormat(此处为TextInputFormat)的createRecordReader方法,将一个FileSplit包装为一个LineRecordReader。
          long bytesInCurr = getInputBytes(fsStats);
          fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
        }
    

    然后再往下看他是怎么包装的。

    org.apache.hadoop.mapreduce.lib.input.TextInputFormat类
    
    @Override
      public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get(
            "textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
          recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        return new LineRecordReader(recordDelimiterBytes);  //他在这里new了一个行读取器
      }
    

    接着让我们再看一下这个行读取器的详情

    org.apache.hadoop.mapreduce.lib.input.LineRecordReader类
    
    public void initialize(InputSplit genericSplit,
                             TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        
        CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
        if (null!=codec) {
          isCompressedInput = true; 
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job,
                this.recordDelimiterBytes);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn;
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, this.recordDelimiterBytes);
            filePosition = fileIn;
          }
        } else {
          fileIn.seek(start);
          in = new UncompressedSplitLineReader(
              fileIn, job, this.recordDelimiterBytes, split.getLength());
          filePosition = fileIn;
        }
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
    

    关于这个行读取器类,我们以后会专门说一下。

    第五阶段:

    org.apache.hadoop.mapred.MapTask类
    
     mapContext = 
    new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), //对应解析4
    input, output, //mapContext即上下文对象封装了输入输出,所以可通过上下文拿到值 则可以得出Mapper类中的content的getCurrentyKey实际上是取得输入对象的LineRecorder
    

    同样,他也new了一个对象,我们接着往下看

    org.apache.hadoop.mapreduce.task.MapContextImpl类
    
      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        return reader.nextKeyValue();
      }
    

    这个方法用来构建上下文

    上述五个阶段总结一下就是:
    取出我们写的map函数,取出我们input指定的文件,然后切片,将每个切片对应map,构建上下文防止切片混乱。全部都是为了接下里的这一部分做准备:

    org.apache.hadoop.mapred.MapTask类
    
    try {
        input.initialize(split, mapperContext); //初始化input,input是recordReader对象,split和mapperContext作为参数
        mapper.run(mapperContext); //这个run方法运行的是Mapper的run方法
        mapPhase.complete(); //上锁
        setPhase(TaskStatus.Phase.SORT);  //所有的task结果进行排序
        statusUpdate(umbilical);  //更新runNewMapper状态
        input.close();
        input = null;  //关闭输入流(倒数三四)
        output.close(mapperContext);
        output = null;  //关闭输出流(倒数一二)
    } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);  //关闭输入输出流
    }
    

    这一部分中,核心的语句是第一与第二句:
    第一句:

    input.initialize(split, mapperContext);
    
    org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader类
    
    @Override
        public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
                               org.apache.hadoop.mapreduce.TaskAttemptContext context
                               ) throws IOException, InterruptedException {
          long bytesInPrev = getInputBytes(fsStats);
          real.initialize(split, context);   // 这里是real 这个变量调用这个方法,我们继续跟踪initialize
          long bytesInCurr = getInputBytes(fsStats);
          fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
        }
    
    org.apache.hadoop.mapreduce.lib.input.LineRecordReader类
    
    public void initialize(InputSplit genericSplit,
                             TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        
        CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
        if (null!=codec) {
          isCompressedInput = true; 
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job,
                this.recordDelimiterBytes);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn;
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, this.recordDelimiterBytes);
            filePosition = fileIn;
          }
        } else {
          fileIn.seek(start);
          in = new UncompressedSplitLineReader(
              fileIn, job, this.recordDelimiterBytes, split.getLength());
          filePosition = fileIn;
        }
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
    

    现在又追踪到了行读取器,现在让我们来说说他中最关键的几条语句。

    起始偏移量:
    start = split.getStart();
    切片大小:
    start = split.getStart();
    end = start + split.getLength();
    文件:
    final Path file = split.getPath();
    他们构成了行读取器中最重要的一个东西--文件偏移量
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    

    第二句:

    mapper.run(mapperContext);
    

    之后就是第二句,第一句是就是在准备切片和偏移量,第二句才是真正开始运行。

    org.apache.hadoop.mapred.MapTask类
    
     public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKeyValue()) {   //对nextKeyValue跟踪 判断context是否还有k-v
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
        } finally {
          cleanup(context);
        }
      }
    

    跟踪之后发现,我们又回到了最初的起点,也就是时候我们可以向下执行了。

    org.apache.hadoop.mapreduce.lib.input.LineRecordReader类
    
    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        }
        key.set(pos);  //获得了偏移量
        if (value == null) {
          value = new Text();  //获得了行的内容
        }
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          if (pos == 0) {
            newSize = skipUtfByteOrderMark();
          } else {
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if ((newSize == 0) || (newSize < maxLineLength)) {
            break;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        }
        if (newSize == 0) {  //到文件未了,就结束while,反之继续取切片
          key = null;
          value = null;
          return false;
        } else {
          return true;
        }
      }
    

    之后我们又来到了这个行读取器类,不过这次我们不是来分切片和统计偏移量了,我们是来取切片与相对应的偏移量。

    自此,Map-Task第一大部分源码解释,他读取了我们的文件与重写的map函数,将我们的文件通过行读取器分解成一个个切片与偏移量。然后接着就要把切片正式放到我们重写的map之内,对切片开始处理,造成输出。

    造成输出

    输出

    造成的输出就要从我们的write函数开始说了。

    第一层:

    // get an output object
        if (job.getNumReduceTasks() == 0) {
          output = 
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
        } else {
          output = new NewOutputCollector(taskContext, job, umbilical, reporter);
        }
    

    job.getNumReduceTasks() == 0成立的这一部分不看,直接看else的部分。

    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    

    追踪

    org.apache.hadoop.mapred.MapTask$NewOutputCollector类
    
    @SuppressWarnings("unchecked")
        NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                           JobConf job,
                           TaskUmbilicalProtocol umbilical,
                           TaskReporter reporter
                           ) throws IOException, ClassNotFoundException {
          collector = createSortingCollector(job, reporter);  //生成容器
          partitions = jobContext.getNumReduceTasks();   //生成分区
          if (partitions > 1) {
            partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
              ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
          } else {
            partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
              @Override
              public int getPartition(K key, V value, int numPartitions) {
                return partitions - 1;
              }
            };
          }
        }
        @Override
        public void write(K key, V value) throws IOException, InterruptedException { // <k,v>=><k,v,p>  把分区也放到k-v对中,同时写入到环形缓冲区
          collector.collect(key, value,
                            partitioner.getPartition(key, value, partitions));
        }
    

    在这段程序中,生成容器是最负责的一部分,因为他和MapReduce中一个老大息息相关,那就是环形缓冲区。

    我们通过createSortingCollector来进行跟踪。

    org.apache.hadoop.mapred.MapTask类
    
    @SuppressWarnings("unchecked")
      private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
              createSortingCollector(JobConf job, TaskReporter reporter)
        throws IOException, ClassNotFoundException {
        MapOutputCollector.Context context =
          new MapOutputCollector.Context(this, job, reporter);
    
        Class<?>[] collectorClasses = job.getClasses(
          JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);  // 调用环形缓冲区
        int remainingCollectors = collectorClasses.length;
        Exception lastException = null;
        for (Class clazz : collectorClasses) {
          try {
            if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
              throw new IOException("Invalid output collector class: " + clazz.getName() +
                " (does not implement MapOutputCollector)");
            }
            Class<? extends MapOutputCollector> subclazz =
              clazz.asSubclass(MapOutputCollector.class);
            LOG.debug("Trying map output collector class: " + subclazz.getName());
            MapOutputCollector<KEY, VALUE> collector =
              ReflectionUtils.newInstance(subclazz, job);
            collector.init(context);  //初始化环形缓冲区
            LOG.info("Map output collector class = " + collector.getClass().getName());
            return collector;  // 返回环形缓冲区
          } catch (Exception e) {
            String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
            if (--remainingCollectors > 0) {
              msg += " (" + remainingCollectors + " more collector(s) to try)";
            }
            lastException = e;
            LOG.warn(msg, e);
          }
        }
        throw new IOException("Initialization of all the collectors failed. " +
          "Error in last collector was :" + lastException.getMessage(), lastException);
      }
    

    我们要看的就是初始环形缓冲区时的init这个方法。

    org.apache.hadoop.mapred.mapoutputbuffer类
    
     @SuppressWarnings("unchecked")
        public void init(MapOutputCollector.Context context
                        ) throws IOException, ClassNotFoundException {
          job = context.getJobConf();
          reporter = context.getReporter();
          mapTask = context.getMapTask();
          mapOutputFile = mapTask.getMapOutputFile();
          sortPhase = mapTask.getSortPhase();
          spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
          partitions = job.getNumReduceTasks();
          rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
    
          //sanity checks
          // 溢写检查
          final float spillper =
            job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
            // 可以通过设置MAP_SORT_SPILL_PERCENT来控制溢写标准,默认是达到内存80%发生溢写
          final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
          // 内存缓冲器
          indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                             INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
          if (spillper > (float)1.0 || spillper <= (float)0.0) {
            throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
                "\": " + spillper);
          }
          if ((sortmb & 0x7FF) != sortmb) {
            throw new IOException(
                "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
          }
          sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
                QuickSort.class, IndexedSorter.class), job);
          //排序器,可以自定义排序,但是默认快速排序
    
          // buffers and accounting
          int maxMemUsage = sortmb << 20;
          maxMemUsage -= maxMemUsage % METASIZE;
          kvbuffer = new byte[maxMemUsage];
          bufvoid = kvbuffer.length;
          kvmeta = ByteBuffer.wrap(kvbuffer)
             .order(ByteOrder.nativeOrder())
             .asIntBuffer();
          setEquator(0);
          bufstart = bufend = bufindex = equator;
          kvstart = kvend = kvindex;
    
          maxRec = kvmeta.capacity() / NMETA;
          softLimit = (int)(kvbuffer.length * spillper);
          bufferRemaining = softLimit;
          LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
          LOG.info("soft limit at " + softLimit);
          LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
          LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
    
          // k/v serialization
          // 对k-v序列化
          comparator = job.getOutputKeyComparator();
          // (排序)比较器
          keyClass = (Class<K>)job.getMapOutputKeyClass();
          valClass = (Class<V>)job.getMapOutputValueClass();
          serializationFactory = new SerializationFactory(job);
          keySerializer = serializationFactory.getSerializer(keyClass);
          keySerializer.open(bb);
          valSerializer = serializationFactory.getSerializer(valClass);
          valSerializer.open(bb);
    
          // output counters
          mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
          mapOutputRecordCounter =
            reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
          fileOutputByteCounter = reporter
              .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
    
          // compression
          if (job.getCompressMapOutput()) {
            Class<? extends CompressionCodec> codecClass =
              job.getMapOutputCompressorClass(DefaultCodec.class);
            codec = ReflectionUtils.newInstance(codecClass, job);
          } else {
            codec = null;
          }
    
          // combiner
          // 本地归约
          final Counters.Counter combineInputCounter =
            reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
          combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                                 combineInputCounter,
                                                 reporter, null);
          if (combinerRunner != null) {
            final Counters.Counter combineOutputCounter =
              reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
            combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
          } else {
            combineCollector = null;
          }
          spillInProgress = false;
          minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
          spillThread.setDaemon(true);
          spillThread.setName("SpillThread");
          spillLock.lock();
          try {
            spillThread.start();
            while (!spillThreadRunning) {
              spillDone.await();
            }
          } catch (InterruptedException e) {
            throw new IOException("Spill thread failed to initialize", e);
          } finally {
            spillLock.unlock();
          }
          if (sortSpillException != null) {
            throw new IOException("Spill thread failed to initialize",
                sortSpillException);
          }
        }
    
    

    接着要说的是排序比较器,他有一个选择自定义还是默认的功能。

    comparator = job.getOutputKeyComparator();
    

    跟踪 getOutputKeyComparator方法

    org.apache.hadoop.mapred.JobConf类
    
    public RawComparator getOutputKeyComparator() {
        Class<? extends RawComparator> theClass = getClass(
          JobContext.KEY_COMPARATOR, null, RawComparator.class);
        if (theClass != null)
          return ReflectionUtils.newInstance(theClass, this);  // 用户使用自定义的排序器
        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);  // 用户使用默认的排序器
      }
    

    在进行着一堆map本地归约,尽可能减少他们在网络上的传输。

    相关文章

      网友评论

          本文标题:1.2.2.2Map-Task(源码解读)

          本文链接:https://www.haomeiwen.com/subject/ywssyhtx.html