美文网首页
Hadoop Shuffle详解

Hadoop Shuffle详解

作者: 幸运猪x | 来源:发表于2019-06-28 22:53 被阅读0次

1. Shuffle流程

map.png
reduce.png

2. Map端

2.1 partition阶段

查看MapTask.runNewMapper方法源码可以看到这里创建了用于输出的对象:

// get an output object
if (job.getNumReduceTasks() == 0) {
    output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

进入output = new NewOutputCollector(taskContext, job, umbilical, reporter);内部,这个类继承了RecordWriter<K, V>类,并重写了write(K, V)方法:

private class NewOutputCollector<K,V>
        extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
    private final MapOutputCollector<K,V> collector;
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
    private final int partitions;

    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
        collector = createSortingCollector(job, reporter);
        partitions = jobContext.getNumReduceTasks();
        if (partitions > 1) {
            partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
        } else {
            partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
                @Override
                public int getPartition(K key, V value, int numPartitions) {
                    return partitions - 1;
                }
            };
        }
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
        collector.collect(key, value, partitioner.getPartition(key, value, partitions));
    }

    @Override
    public void close(TaskAttemptContext context
    ) throws IOException,InterruptedException {
        try {
            collector.flush();
        } catch (ClassNotFoundException cnf) {
            throw new IOException("can't find class ", cnf);
        }
        collector.close();
    }
}

可以看到在write方法中调用了partitioner.getPartition(key, value, partitions)这个方法来实现分区。

以前搜索Shuffle分区是如何实现的时候,都说是默认是哈希分区

这里可以看到在NewOutputCollector构造方法中,为partitioner赋值时,使用了反射。那我们进入JobContext接口的实现类JobContextImpl中:

@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}

我们可以看到默认的分区类就是HashPartitioner,然后我们就可以进入HashPartitioner中,查看分区的具体实现了:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    /** Use {@link Object#hashCode()} to partition. */
    public int getPartition(K key, V value, int numReduceTasks) {
          return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

** MapTask有个非常重要的内部类MapOutputBuffer,它实现了许多重要的功能,比如 **

2.2 sort阶段和spill阶段

以前查资料的时候得知sort默认使用的是快速排序

这里可以在MapOutputBuffer.inint()中看到,通过反射创建了IndexedSorter对象,并默认指定了QuickSort

sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);

那么排序使如何实现的呢?没错就是在MapOutputBuffer内部实现的:

private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException {
    //approximate the length of the output file to be the length of the
    //buffer + header lengths for the partitions
    final long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH;
    // 创建文件输出流
    FSDataOutputStream out = null;
    try {
        // create spill file 创建溢写文件
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

        final int mstart = kvend / NMETA;
        // kvend is a valid record
        final int mend = 1 +  (kvstart >= kvend ? kvstart : kvmeta.capacity() + kvstart) / NMETA;
        // 这里就是在溢写前,实现排序的地方
        sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
        int spindex = mstart;
        final IndexRecord rec = new IndexRecord();
        final InMemValBytes value = new InMemValBytes();
        // 遍历每个分区
        for (int i = 0; i < partitions; ++i) {
            IFile.Writer<K, V> writer = null;
            try {
                long segmentStart = out.getPos();
                FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
                writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter);
                // 如果没有设置Combiner,就直接溢写
                if (combinerRunner == null) {
                    // spill directly
                    DataInputBuffer key = new DataInputBuffer();
                    while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                        final int kvoff = offsetFor(spindex % maxRec);
                        int keystart = kvmeta.get(kvoff + KEYSTART);
                        int valstart = kvmeta.get(kvoff + VALSTART);
                        key.reset(kvbuffer, keystart, valstart - keystart);
                        getVBytesForOffset(kvoff, value);
                        writer.append(key, value);
                        ++spindex;
                    }
                } else {
                    // 定义了combiner,使用combiner合并数据
                    int spstart = spindex;
                    while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
                        ++spindex;
                    }
                    // Note: we would like to avoid the combiner if we've fewer
                    // than some threshold of records for a partition
                    if (spstart != spindex) {
                        combineCollector.setWriter(writer);
                        RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
                        combinerRunner.combine(kvIter, combineCollector);
                    }
                }

                // close the writer
                writer.close();

                // record offsets
                rec.startOffset = segmentStart;
                rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
                rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
                spillRec.putIndex(rec, i);

                writer = null;
            } finally {
                if (null != writer) writer.close();
            }
        }

        if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
            // create spill index file
            Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
            spillRec.writeToFile(indexFilename, job);
        } else {
            indexCacheList.add(spillRec);
            totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
    } finally {
        if (out != null) out.close();
    }
}

2.3 merge阶段

MapOutputBuffer有一个mergeParts()方法,从名字就可以得知merge的实现就是在MapOutputBuffer.mergeParts()中实现的。

3. Reduce端

和Map一样,我们需要查找ReduceTask源码来分析,看一下它的run()方法,这里就只列一下比较重要的几行:

//使用的shuffle插件
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);                  
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
// 对shuffle插件进行初始化
ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
// 运行shuffle插件核心代码,通过网络IO将Map结果拉取过来,并进行merge操作
rIter = shuffleConsumerPlugin.run();

下面我们看一下ShuffleConsumerPlugin的核心代码,这个接口只有一个实现类Shuffle,在它的run()方法中:

3.1 拉取Map结果:

// Start the map-output fetcher threads
// 开始从抓取map的结果
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
// 判断map结果是否是在本地,创建Fetcher对象,开始抓取结果
if (isLocal) {
    fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
            merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
            localMapFiles);
    fetchers[0].start();
} else {
    for (int i=0; i < numFetchers; ++i) {
        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
                reporter, metrics, this,
                reduceTask.getShuffleSecret());
        fetchers[i].start();
    }
}

3.2 Reduce的merge:

Shuffle初始化时,会创建MergeManger对象,对merge进行管理:

@Override
public void init(ShuffleConsumerPlugin.Context context) {
    this.context = context;
    // ......
    scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
            this, copyPhase, context.getShuffledMapsCounter(),
            context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
    // 创建merge对象
    merger = createMergeManager(context);
}
protected MergeManager<K, V> createMergeManager(ShuffleConsumerPlugin.Context context) {
    return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
        context.getLocalDirAllocator(), reporter, context.getCodec(),
        context.getCombinerClass(), context.getCombineCollector(), 
        context.getSpilledRecordsCounter(),
        context.getReduceCombineInputCounter(),
        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
        context.getMapOutputFile());
}

上面可以看到实际上创建了MergeManagerImpl,进入它的内部,可以看到:

this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start(); 
this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();  
this.mergePhase = mergePhase;

相关文章

网友评论

      本文标题:Hadoop Shuffle详解

      本文链接:https://www.haomeiwen.com/subject/ujrdvftx.html