美文网首页
MapReduce架构师3—MapReduce详解2

MapReduce架构师3—MapReduce详解2

作者: fat32jin | 来源:发表于2020-08-23 11:43 被阅读0次

    3. 7. MapReduce的MapOutputBuffer内存环形缓冲区源码详解 0:13:00~2:44:00

    入口: MapTask#runNewMapper()

    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    NewOutputCollector构造函数里面
    collector = createSortingCollector(job, reporter);

                    /**
                     * TODO 通过反射构建实例  一个 MapTask 对应一个 MapOutputCollector
                     * 当前这个实例:完成 当前 MapTask 的 context.write(key, value) 往后的所有事情
                     */
                    MapOutputCollector<KEY, VALUE> collector = ReflectionUtils.newInstance(subclazz, job);
                    
                    /**
                     * TODO  环形缓冲区的初始化
                            1、把  环形缓冲区的初始化好
                            2、启动一个专门做spill的线程
                     */
                    collector.init(context);
                       ↓  ********开始溢写准备,填充环形缓冲区 0:40:00 
                     MapOoutputBuffer#init
                              ▼  
                 /**
                 * TODO 从 100M 中分出来一部分用来存储真实数据
                 */
                 // buffers and accounting
                int maxMemUsage = sortmb << 20;
                maxMemUsage -= maxMemUsage % METASIZE;
                kvbuffer = new byte[maxMemUsage];       //100M
    
    image.png
               bufstart = bufend = bufindex = equator;
               kvstart = kvend = kvindex;
    
            /**
                 * TODO 当前 MpaOutputBuffer 这个环形缓冲区的管理类,事实上管理了两个重要的东西:
                 *  1、100M 大小的字节数组
                 *  2、SpillThread 负责 kvBuffer 中装满了的 80% 空间的数据的溢写
                 */
                spillLock.lock();
                try {
                    spillThread.start();
                      ↓                 
    spillThread 类 run方法  (此方法并不会马上执行,要等init 过程结束后,在后续的   mapper.run 阶段才会 执行)
            /*
             * 4个 重要部件
             * 1、spillthread()  sortAndSpill()  startSpill()
             * 2、可重入锁  spillLocak
             * 3、信号量   spillDone   spillReady
             *
             * 2个重要流程
             *
             * 1、写数据线程
             * 2、spill线程
             *
             * 重要机制
             * 1、 kvbuffer写满80% 时被锁定
             * 
             */
        try {
                        while (true) {
                            
                            /**
                             * TODO 等信号
                             */
                            spillDone.signal();
                            while (!spillInProgress) {
                                spillReady.await();
                            }
                            try {
                                spillLock.unlock();
                                
                                /**
                                 * TODO  这个方法就是溢写操作的实现 ,但是
                                 */
                                sortAndSpill(); 
                   
    
    环形缓冲区原理图
    image.png image.png
    ************ spill线程溢写环形缓冲区数据到磁盘逻辑*********** 1:20:00

    接上 3. 7 入口: MapTask#runNewMapper()
    初始化环形缓冲区
    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    完成后

     mapper.run(mapperContext); 方法
              ▼
     // 调用业务逻辑处理一个key-value
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
    ——》context.write()
                   ↓  
    WrappedMapper.Context.write()
               ▼ 
      mapContext.write(key, value);
              ↓  
    TaskInputOutputContextImpl.write(key, value);
         ▼ 
     output.write(key, value);
          ↓  
    NewOutputCollector.write(key, value);
         ▼ 
    /**
               * TODO
               */
              int partition_no = partitioner.getPartition(key, value, partitions);
      
              /**
               * collector.collect 写进入到了 MapOoutputBuffer
               */
              collector.collect(key, value, partition_no);
                 ↓  
    MapOoutputBuffer.collect()
                 ▼    
          startSpill();
              ▼   
         spillReady.signal();
              ↓  激活溢写信号
        spillThread.run()
              ▼ 真正完成数据从mapper写出 环形缓冲区到磁盘
         sortAndSpill()
             ▼
    
    image.png

    //溢出的数据要做排序 quicksort
    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);

    combiner 阶段合并逻辑 1:36:00 ~2:10:00

    1 接上 sortAndSpill方法 里面
    if (combinerRunner == null)

    这个 combinerRunner 在 3. 7 入口
    环形缓冲区初始化 前
    MapOoutputBuffer#init 方法

    CombinerRunner#create()方法
    ▼ 获取用户设置的 CombinerClass
    Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K, V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K,
    V>>) taskContext
    .getCombinerClass();
    if (newcls != null) {
    return new NewCombinerRunner<K, V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer);
    }

    combineCollector = new CombineOutputCollector<K, V>(combineOutputCounter, reporter, job);

    2 接上 sortAndSpill方法 里面
    combinerRunner.combine

    return new NewCombinerRunner<K, V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer);

    3.2 spill 和 merge 阶段 2:10:00~2:13:00

    接上 3. 7 入口: MapTask#runNewMapper()
    output.close(mapperContext);

    NewOutputCollector.close

    collector.flush();

    MapOoutputBuffer#flush

    sortAndSpill();//溢写
    mergeParts();//合并

    3.3 reduceTask阶段
    3.3 .1 reduceTask sort合并阶段 2:13:00~

    入口方法: ReduceTask#run()

          /*
             * TODO 这个地方初始化 最重要是初始化了一个output 
             */
            initialize(job, getJobID(), reporter, useNewApi);
    
         /*
            TODO reduce阶段负责拉取数据 和执行合并,都由shuffleConsumerPlugin组件来做
             */
            ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
          /*
            *  3.3.1.1     插件初始化,创建2个重要对象
             * 1、ShuffleSchedulerImpl  2、createMergeManager
             */
            shuffleConsumerPlugin.init(shuffleContext);
             ↓
    ———s———
    Shuffle#init
             ▼ 
     /*
        拉取数据对象
         */
        scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
            this, copyPhase, context.getShuffledMapsCounter(),
            context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
        /*
          负责合并对象
         */
        merger = createMergeManager(context);
    ———e———
    
    /* 3.3.1.2  真正执行拉取合并操作
      *
      */
      rIter = shuffleConsumerPlugin.run();
                   ↓      
             Shuffle#run
                   ▼ 
    ———s———
    
        Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
    
          fetchers[i].start();
                      ↓     
        Fetcher.run()
                     ▼   
          // Shuffle
              copyFromHost(host);
    

    ———e———

    接上      Shuffle#run
     kvIter = merger.close();
    ——》MergeManagerImpl.close();
                    ▼  
         return finalMerge(jobConf, rfs, memory, disk);
                     ▼  合并结果写到磁盘文件
     try {
              Merger.writeFile(rIter, writer, reporter, job);
    
    ##### 3.3.2   reduceTask  reduce阶段     2:36:00~  3:06:00 
     
    接上     3.3.1.2  真正执行拉取合并操作   后
         runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
                         ▼ 
              /*
                *   自己写的 MR程序里面的 reduce类
                 */
                reducer.run(reducerContext);
                             ▼ 
    ———s———
    
    image.png

    while (context.nextKey()) {
    reduce(context.getCurrentKey(), context.getValues(), context);

    ReduceContextImpl.nextKey() 方法

    image.png

    ———e———

    接上 reducer.run 方法

    TaskInputOutputContextImpl.write

    TextOutputFormat.LineRecordWriter. write

     boolean nullKey = key == null || key instanceof NullWritable;
          boolean nullValue = value == null || value instanceof NullWritable;
          if (nullKey && nullValue) {
            return;  
          }
          if (!nullKey) {
            writeObject(key);
          }
          if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
          }
          if (!nullValue) {
            writeObject(value);
          }
          out.write(newline);
    
    

    相关文章

      网友评论

          本文标题:MapReduce架构师3—MapReduce详解2

          本文链接:https://www.haomeiwen.com/subject/wmyvjktx.html