概述
基于Hadoop 2.x
Reduce阶段什么时候开始?
默认情况下,当Map任务完成个数达到Map任务总数的5%时,MRAppMaster开始向Yarn为Reduce申请Container,并启动Reduce进程。
此时Reduce启动后拉取每个Map的结果集中属于自己的那个分区的数据。注意这时候Reduce虽然启动了,但执行的操作只是拉取分区数据,也就是说必须所有的Map任务全部完成,所有的分区数据都已经拉取,Reduce阶段才真正开始处理数据。
这个阈值偏低,容易造成Reduce阶段早早的启动,占用了有限的资源,却没干多少活。可通过以下参数调整此阈值:
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
<value>0.8</value>
</property>
Reduce阶段从哪里拉取Map阶段产生的数据?
通过HTTP请求从Shuffle HTTP Server拉取数据。
Shuffle HTTP Server是由NodeManager启动的。使用的是Yarn的AuxServices机制,NodeManager允许用户通过配置附属服务的方式扩展自己的功能,这使得每个节点可以定制一些特定框架需要的服务。附属服务需要在NodeManagger启动前配置好,并由NodeManagger统一启动和关闭。
Map阶段将结果数据提交到Shuffle HTTP Server后Map进程就退出了,后续Reduce进程是从Shuffle HTTP Server拉取数据的。
可以从yarn-site.xml找到如下配置:
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
Reduce阶段的流程是怎么样的?
Reduce阶段启动的进程也是YarnChild -> org.apache.hadoop.mapred.YarnChild#main。整个Reduce阶段可以分为三个阶段:①Map结果数据拉取;②合并排序;③执行Reduce处理逻辑。
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
1、在MRAppMaster进程中通过ContainerLauncher向NodeManager发送Container启动命令,启动YarnChild进程org.apache.hadoop.mapred.YarnChild#main,通过启动命令传入了以下几个参数:
①MRAppMaster进程中的TaskAttemptListener组件提供的TaskUmbilicalProtocol服务的host和port。
②当前任务的TaskAttemptID。
③当前任务的JVMId。
注意:TaskAttemptID很重要,它是Reduce任务的唯一标识,也是Reduce阶段拉取Map结果数据的凭证,它决定了拉取Map结果数据的哪个分区。
class YarnChild {
public static void main(String[] args) throws Throwable {
String host = args[0];
int port = Integer.parseInt(args[1]);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
long jvmIdLong = Long.parseLong(args[3]);
.....
}
}
2、初始阶段跟Map阶段一样,通过RPC协议TaskUmbilicalProtocol与MRAppMaster建立通讯。
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(host, port);
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
@Override
public TaskUmbilicalProtocol run() throws Exception {
return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID, address, job);
}
});
3、通过RPC协议TaskUmbilicalProtocol获取要处理的任务,Reduce阶段获取的任务实例为org.apache.hadoop.mapred.ReduceTask
,然后开始执行任务。
myTask = umbilical.getTask(context);
taskFinal.run(job, umbilical); // run the task
4、初始化Shuffle组件,开始后半段Shuffle(整个Shuffle的前半段在发生在Map阶段)。
# org.apache.hadoop.mapred.ReduceTask#run
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
// 加载Shuffle组件
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
// 创建Shuffle上下文信息
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
// 运行Shuffle
rIter = shuffleConsumerPlugin.run();
①启动EventFetcher
线程,不断从MRAppMaster
获取Map任务完成的事件。以此来感知哪些Map任务完成了,也就是哪些Map阶段结果数据可以拉取了。
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
// 通过RPC请求MRAppMaster获取Map完成事件。
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
events = update.getMapTaskCompletionEvents();
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
fromEventIdx += events.length;
// 将获取到的TaskCompletionEvent事件交给ShuffleScheduler来处理
for (TaskCompletionEvent event : events) {
scheduler.resolve(event);
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {
++numNewMaps;
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
将获取到的Map任务完成事件交给ShuffleScheduler
处理,ShuffleScheduler
对应的实例为ShuffleSchedulerImpl
。状态为SUCCESSED
的Map任务被封装成MapHost对象,放入Set<MapHost> pendingHosts
中,以备后续处理。
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
// 事件状态是成功的,代表Map任务顺利完成,将其封装成MapHost对象放入Set<MapHost> pendingHosts = new HashSet<MapHost>()中,以备后续处理。
case SUCCEEDED:
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
break;
case FAILED:
case KILLED:
case OBSOLETE:
obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
break;
case TIPFAILED:
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
break;
}
}
②启动一批Fetcher
线程(可配置,默认5个),从ShuffleScheduler
获取Map数据的URL,并发拉取Map结果数据,拉取的方式是HTTP请求。
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
将拉取过来的数据封装成MapOutput
对象,MapOutput
有两个实现,分别为InMemoryMapOutput
& OnDiskMapOutput
,从名字可以看出来,拉取到的数据可以放到内存里,也可以放到磁盘里。mapreduce.reduce.shuffle.memory.limit.percent
这个参数控制了,有多少内存可以用来存放Map结果数据,默认0.25,即25%堆内存。超过25%以后,拉取的Map结果数据一律放在磁盘上。
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
long requestedSize,
int fetcher
) throws IOException {
if (requestedSize > maxSingleShuffleLimit) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
}
if (usedMemory > memoryLimit) {
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return null;
}
// Allow the in-memory shuffle to progress
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ "CommitMemory is (" + commitMemory + ")");
return unconditionalReserve(mapId, requestedSize, true);
}
③在拉取Map结果数据的同时,会有两个线程同时执行合并操作,InMemoryMerger
线程负责合并内存中的Map结果数据,OnDiskMerger
线程负责合并磁盘上的Map结果数据。生成的MapOutput会交给InMemoryMerger
和 OnDiskMerger
线程进行合并处理。
④当所有的Map结果数据拉取完毕、内存中的Map结果数据全部合并排序完毕、磁盘中的Map结果数据全部合并排序完毕时会进行最终合并,即将内存和磁盘中的Map结果数据合并。
// stop the scheduler
scheduler.close();
copyPhase.complete();
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
// 等待内存中的Map结果数据全部合并完毕
inMemoryMerger.close();
// 等待磁盘中的Map结果数据全部合并完毕
onDiskMerger.close();
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
// 内存和磁盘中的Map结果数据合并
return finalMerge(jobConf, rfs, memory, disk);
}
如何进行排序的呢?主要依靠org.apache.hadoop.mapred.Merger.MergeQueue类
,MergeQueue
继承了优先队列,实现了迭代器接口。MergeQueue
中的元素是以Segment
为单位,Segment
是内部已经排序完成的数据段。Segment
中的第一个元素的大小代表这个Segment的大小,以此来进行优先队列的调整。当通过迭代方法从MergeQueue获取元素是,必定是当前最小(最大)的元素。这样就实现了排序的效果。
private static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator
至此,Shuffle后半段结束。
5、开始执行Reduce逻辑。Reduce结果直接输出到磁盘
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
6、完成以上操作后,Reduce进程向MRAppMaster进程汇报任务结束,然后退出进程。
网友评论