3. 7. MapReduce的MapOutputBuffer内存环形缓冲区源码详解 0:13:00~2:44:00
入口: MapTask#runNewMapper()
▼
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
NewOutputCollector构造函数里面
collector = createSortingCollector(job, reporter);
▼
/**
* TODO 通过反射构建实例 一个 MapTask 对应一个 MapOutputCollector
* 当前这个实例:完成 当前 MapTask 的 context.write(key, value) 往后的所有事情
*/
MapOutputCollector<KEY, VALUE> collector = ReflectionUtils.newInstance(subclazz, job);
/**
* TODO 环形缓冲区的初始化
1、把 环形缓冲区的初始化好
2、启动一个专门做spill的线程
*/
collector.init(context);
↓ ********开始溢写准备,填充环形缓冲区 0:40:00
MapOoutputBuffer#init
▼
/**
* TODO 从 100M 中分出来一部分用来存储真实数据
*/
// buffers and accounting
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
kvbuffer = new byte[maxMemUsage]; //100M
image.png
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
/**
* TODO 当前 MpaOutputBuffer 这个环形缓冲区的管理类,事实上管理了两个重要的东西:
* 1、100M 大小的字节数组
* 2、SpillThread 负责 kvBuffer 中装满了的 80% 空间的数据的溢写
*/
spillLock.lock();
try {
spillThread.start();
↓
spillThread 类 run方法 (此方法并不会马上执行,要等init 过程结束后,在后续的 mapper.run 阶段才会 执行)
/*
* 4个 重要部件
* 1、spillthread() sortAndSpill() startSpill()
* 2、可重入锁 spillLocak
* 3、信号量 spillDone spillReady
*
* 2个重要流程
*
* 1、写数据线程
* 2、spill线程
*
* 重要机制
* 1、 kvbuffer写满80% 时被锁定
*
*/
try {
while (true) {
/**
* TODO 等信号
*/
spillDone.signal();
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
/**
* TODO 这个方法就是溢写操作的实现 ,但是
*/
sortAndSpill();
环形缓冲区原理图
image.png image.png************ spill线程溢写环形缓冲区数据到磁盘逻辑*********** 1:20:00
接上 3. 7 入口: MapTask#runNewMapper()
初始化环形缓冲区
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
完成后
mapper.run(mapperContext); 方法
▼
// 调用业务逻辑处理一个key-value
map(context.getCurrentKey(), context.getCurrentValue(), context);
——》context.write()
↓
WrappedMapper.Context.write()
▼
mapContext.write(key, value);
↓
TaskInputOutputContextImpl.write(key, value);
▼
output.write(key, value);
↓
NewOutputCollector.write(key, value);
▼
/**
* TODO
*/
int partition_no = partitioner.getPartition(key, value, partitions);
/**
* collector.collect 写进入到了 MapOoutputBuffer
*/
collector.collect(key, value, partition_no);
↓
MapOoutputBuffer.collect()
▼
startSpill();
▼
spillReady.signal();
↓ 激活溢写信号
spillThread.run()
▼ 真正完成数据从mapper写出 环形缓冲区到磁盘
sortAndSpill()
▼
image.png
//溢出的数据要做排序 quicksort
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
combiner 阶段合并逻辑 1:36:00 ~2:10:00
1 接上 sortAndSpill方法 里面
if (combinerRunner == null)
这个 combinerRunner 在 3. 7 入口
环形缓冲区初始化 前
MapOoutputBuffer#init 方法
▼
CombinerRunner#create()方法
▼ 获取用户设置的 CombinerClass
Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K, V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K, V, K,
V>>) taskContext
.getCombinerClass();
if (newcls != null) {
return new NewCombinerRunner<K, V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer);
}
combineCollector = new CombineOutputCollector<K, V>(combineOutputCounter, reporter, job);
2 接上 sortAndSpill方法 里面
combinerRunner.combine
▼
return new NewCombinerRunner<K, V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer);
3.2 spill 和 merge 阶段 2:10:00~2:13:00
接上 3. 7 入口: MapTask#runNewMapper()
output.close(mapperContext);
↓
NewOutputCollector.close
▼
collector.flush();
↓
MapOoutputBuffer#flush
▼
sortAndSpill();//溢写
mergeParts();//合并
▼
3.3 reduceTask阶段
3.3 .1 reduceTask sort合并阶段 2:13:00~
入口方法: ReduceTask#run()
▼
/*
* TODO 这个地方初始化 最重要是初始化了一个output
*/
initialize(job, getJobID(), reporter, useNewApi);
/*
TODO reduce阶段负责拉取数据 和执行合并,都由shuffleConsumerPlugin组件来做
*/
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
/*
* 3.3.1.1 插件初始化,创建2个重要对象
* 1、ShuffleSchedulerImpl 2、createMergeManager
*/
shuffleConsumerPlugin.init(shuffleContext);
↓
———s———
Shuffle#init
▼
/*
拉取数据对象
*/
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
/*
负责合并对象
*/
merger = createMergeManager(context);
———e———
/* 3.3.1.2 真正执行拉取合并操作
*
*/
rIter = shuffleConsumerPlugin.run();
↓
Shuffle#run
▼
———s———
▼
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
fetchers[i].start();
↓
Fetcher.run()
▼
// Shuffle
copyFromHost(host);
———e———
接上 Shuffle#run
kvIter = merger.close();
——》MergeManagerImpl.close();
▼
return finalMerge(jobConf, rfs, memory, disk);
▼ 合并结果写到磁盘文件
try {
Merger.writeFile(rIter, writer, reporter, job);
##### 3.3.2 reduceTask reduce阶段 2:36:00~ 3:06:00
接上 3.3.1.2 真正执行拉取合并操作 后
runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
▼
/*
* 自己写的 MR程序里面的 reduce类
*/
reducer.run(reducerContext);
▼
———s———
image.png
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
↓
ReduceContextImpl.nextKey() 方法
———e———
接上 reducer.run 方法
↓
TaskInputOutputContextImpl.write
↓
TextOutputFormat.LineRecordWriter. write
▼
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
网友评论