总目录:https://www.jianshu.com/p/e406a9bc93a9
Hadoop - 子目录:https://www.jianshu.com/p/9428e443b7fd
Map-Task的源码分为两部分,一部分是我们对map的输入(包含实例化),另一部分是因为有了输入,所以也造成了输出。
我们先看第一部分,我们对map的输入:
map的实例化

之后我们会进入到org.apache.hadoop.mapreduce.Mapper
类中,在这里我们直接看他的run方法:
org.apache.hadoop.mapreduce.Mapper类
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
之后我们要打开org.apache.hadoop.mapred.MapTask
类,在这个类中找到run方法的同名方法:
第一层
org.apache.hadoop.mapred.MapTask类
这个方法中,涵盖了所有我们造成的输入。
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
这个方法中有两个重要的代码段,第一段是:
if (isMapTask()) { // 判断是否有map任务
if (conf.getNumReduceTasks() == 0) { //判断是否有reduce任务
mapPhase = getProgress().addPhase("map", 1.0f); // 如果没有的话 map阶段占据整个任务的100%
} else {
// 如果有的话,map阶段占据67%,sort阶段占据33%
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
这一段代码把map阶段要进行的任务划分完成。
第二段是:
if (useNewApi) {
// 喜闻乐见的选择执行新版map,还是旧版map。
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
根据新版map方法进行追踪。
第二层
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
这个阶段同样分为两部分,第一部分是准备细节,第二部分是干活的部分。
第一部分:
第一部分分为五个阶段
第一阶段:
org.apache.hadoop.mapred.MapTask类
// make a task context so we can get the classes
// 创建任务上下文以便我们可以获取类
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, //我们自定义的job
getTaskID(),
reporter); //创建上下文
第二阶段:
org.apache.hadoop.mapred.MapTask类
// make a mapper
// 创造mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 把自定义的map类反射出来
第三阶段:
org.apache.hadoop.mapred.MapTask类
// make the input format
// 制作输入格式
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); //把自定的InputFormat类反射出来 后续解析
第四阶段:
org.apache.hadoop.mapred.MapTask类
// rebuild the input split\
// 制造输入切片
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());//每一个切片条目对应的是一个MapTask 每个切片中对应的4个东西(文件归属,偏移量,长度,位置信息)
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE> //后续解析
(split, inputFormat, reporter, taskContext);//上面准备的输入格式化和切片为输入准备,拿到流,怎么读按文本方式读,行级
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
在 我们可以看到在我标注后续解析那里,maptask是new了一个对象出来,而NewTrackingRecordReader
类才是实际干活的。
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader类
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter
.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesInPrev = getInputBytes(fsStats);
this.real = inputFormat.createRecordReader(split, taskContext); //调用了InputFormat(此处为TextInputFormat)的createRecordReader方法,将一个FileSplit包装为一个LineRecordReader。
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
然后再往下看他是怎么包装的。
org.apache.hadoop.mapreduce.lib.input.TextInputFormat类
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes); //他在这里new了一个行读取器
}
接着让我们再看一下这个行读取器的详情
org.apache.hadoop.mapreduce.lib.input.LineRecordReader类
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
关于这个行读取器类,我们以后会专门说一下。
第五阶段:
org.apache.hadoop.mapred.MapTask类
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), //对应解析4
input, output, //mapContext即上下文对象封装了输入输出,所以可通过上下文拿到值 则可以得出Mapper类中的content的getCurrentyKey实际上是取得输入对象的LineRecorder
同样,他也new了一个对象,我们接着往下看
org.apache.hadoop.mapreduce.task.MapContextImpl类
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
这个方法用来构建上下文
上述五个阶段总结一下就是:
取出我们写的map函数,取出我们input指定的文件,然后切片,将每个切片对应map,构建上下文防止切片混乱。全部都是为了接下里的这一部分做准备:
org.apache.hadoop.mapred.MapTask类
try {
input.initialize(split, mapperContext); //初始化input,input是recordReader对象,split和mapperContext作为参数
mapper.run(mapperContext); //这个run方法运行的是Mapper的run方法
mapPhase.complete(); //上锁
setPhase(TaskStatus.Phase.SORT); //所有的task结果进行排序
statusUpdate(umbilical); //更新runNewMapper状态
input.close();
input = null; //关闭输入流(倒数三四)
output.close(mapperContext);
output = null; //关闭输出流(倒数一二)
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext); //关闭输入输出流
}
这一部分中,核心的语句是第一与第二句:
第一句:
input.initialize(split, mapperContext);
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader类
@Override
public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.TaskAttemptContext context
) throws IOException, InterruptedException {
long bytesInPrev = getInputBytes(fsStats);
real.initialize(split, context); // 这里是real 这个变量调用这个方法,我们继续跟踪initialize
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
org.apache.hadoop.mapreduce.lib.input.LineRecordReader类
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
现在又追踪到了行读取器,现在让我们来说说他中最关键的几条语句。
起始偏移量:
start = split.getStart();
切片大小:
start = split.getStart();
end = start + split.getLength();
文件:
final Path file = split.getPath();
他们构成了行读取器中最重要的一个东西--文件偏移量
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
第二句:
mapper.run(mapperContext);
之后就是第二句,第一句是就是在准备切片和偏移量,第二句才是真正开始运行。
org.apache.hadoop.mapred.MapTask类
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) { //对nextKeyValue跟踪 判断context是否还有k-v
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
跟踪之后发现,我们又回到了最初的起点,也就是时候我们可以向下执行了。
org.apache.hadoop.mapreduce.lib.input.LineRecordReader类
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos); //获得了偏移量
if (value == null) {
value = new Text(); //获得了行的内容
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) { //到文件未了,就结束while,反之继续取切片
key = null;
value = null;
return false;
} else {
return true;
}
}
之后我们又来到了这个行读取器类,不过这次我们不是来分切片和统计偏移量了,我们是来取切片与相对应的偏移量。
自此,Map-Task第一大部分源码解释,他读取了我们的文件与重写的map函数,将我们的文件通过行读取器分解成一个个切片与偏移量。然后接着就要把切片正式放到我们重写的map之内,对切片开始处理,造成输出。
造成输出

造成的输出就要从我们的write函数开始说了。
第一层:
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
job.getNumReduceTasks() == 0
成立的这一部分不看,直接看else的部分。
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
追踪
org.apache.hadoop.mapred.MapTask$NewOutputCollector类
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter); //生成容器
partitions = jobContext.getNumReduceTasks(); //生成分区
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
@Override
public void write(K key, V value) throws IOException, InterruptedException { // <k,v>=><k,v,p> 把分区也放到k-v对中,同时写入到环形缓冲区
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
在这段程序中,生成容器是最负责的一部分,因为他和MapReduce中一个老大息息相关,那就是环形缓冲区。
我们通过createSortingCollector
来进行跟踪。
org.apache.hadoop.mapred.MapTask类
@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); // 调用环形缓冲区
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
collector.init(context); //初始化环形缓冲区
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector; // 返回环形缓冲区
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was :" + lastException.getMessage(), lastException);
}
我们要看的就是初始环形缓冲区时的init
这个方法。
org.apache.hadoop.mapred.mapoutputbuffer类
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
// 溢写检查
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
// 可以通过设置MAP_SORT_SPILL_PERCENT来控制溢写标准,默认是达到内存80%发生溢写
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
// 内存缓冲器
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
//排序器,可以自定义排序,但是默认快速排序
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization
// 对k-v序列化
comparator = job.getOutputKeyComparator();
// (排序)比较器
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// output counters
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
// combiner
// 本地归约
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
接着要说的是排序比较器,他有一个选择自定义还是默认的功能。
comparator = job.getOutputKeyComparator();
跟踪 getOutputKeyComparator
方法
org.apache.hadoop.mapred.JobConf类
public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this); // 用户使用自定义的排序器
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); // 用户使用默认的排序器
}
在进行着一堆map本地归约,尽可能减少他们在网络上的传输。
网友评论