概述
基于Hadoop 2.x
Map阶段启动的大致流程是怎样的?
1、在MRAppMaster进程中通过ContainerLauncher向NodeManager发送Container启动命令,启动YarnChild进程org.apache.hadoop.mapred.YarnChild#main
,通过启动命令传入了以下几个参数:
①MRAppMaster进程中的TaskAttemptListener组件提供的TaskUmbilicalProtocol服务的host和port。
②当前任务的TaskAttemptID。
③当前任务的JVMId。
class YarnChild {
public static void main(String[] args) throws Throwable {
String host = args[0];
int port = Integer.parseInt(args[1]);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
long jvmIdLong = Long.parseLong(args[3]);
.....
}
}
2、通过RPC协议TaskUmbilicalProtocol与MRAppMaster建立通讯。
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(host, port);
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
@Override
public TaskUmbilicalProtocol run() throws Exception {
return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID, address, job);
}
});
3、通过RPC协议TaskUmbilicalProtocol获取要处理的任务,Map阶段获取的任务实例为org.apache.hadoop.mapred.MapTask
,然后开始执行任务。
myTask = umbilical.getTask(context);
taskFinal.run(job, umbilical); // run the task
4、在MapTask任务真正执行之前,先准备运行的环境。
①初始化TaskReporter,随时向MRAppMaster进程汇报Map任务的进度。
TaskReporter reporter = startReporter(umbilical);
②初始化Job上下文信息、Task上下文信息、Map阶段结果提交器等。
initialize(job, getJobID(), reporter, useNewApi);
// 初始化Job上下文信息
jobContext = new JobContextImpl(job, id, reporter);
// 初始化Task上下文信息
taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
// 初始化Map阶段结果提交器
outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
committer = outputFormat.getOutputCommitter(taskContext);
5、初始化数据源、数据输入器、数据处理器、数据输出器。
runNewMapper(job, splitMetaInfo, umbilical, reporter);
// 初始化数据源,也就是数据分片
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
// 初始化数据输入器,通过配置的InputFormat获取
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
// 初始化数据处理器,也就是我们自定义的Mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// 初始化数据输出器,当Mapper处理完数据后将数据交给output输出
org.apache.hadoop.mapreduce.RecordWriter output = null;
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
6、开始数据处理。到这里就很熟悉了,这里就是我们自己编写的Mapper,循环处理所有数据。
mapper.run(mapperContext);
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
7、Map处理结果输出
在我们自定义的Mapper都会有这样一行代码context.write(key, value)
,进行处理结果的收集和输出操作。
数据输出器的具体实现类为:org.apache.hadoop.mapred.MapTask.NewOutputCollector
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final MapOutputCollector<K,V> collector;
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
private final int partitions;
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
// 数据分区数为Reduce的数量
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
...
}
从上面的NewOutputCollector
的构造方法可以看出,数据分区数为Reduce的数量,假设有n个Reduce,那么每个Map产生的结果集数据要分成n份,每个Reduce对应一份。
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
从上面的write(K key, V value)
方法可以看出,每条数据都会计算它归属的分区。然后写入MapOutputCollector<K,V> collector
中。
具体实现为:org.apache.hadoop.mapred.MapTask.MapOutputBuffer
MapOutputBuffer内部是一个数组,在逻辑上是一个环形数组,这个就是MapReduce的环形缓冲区。
每条数据分为两部分写入MapOutputBuffer:
①真实的数据(Key-Value值)
// 写入key值
keySerializer.serialize(key);
// 写入value值
valSerializer.serialize(value);
②数据的元信息:属于哪个分区、key值在buffer中的起始位置、value值在buffer中的起始位置、value值的长度。这部分元信息起到索引的作用。
// 写入分区信息
kvmeta.put(kvindex + PARTITION, partition);
// 写入key值在buffer中的起始位置
kvmeta.put(kvindex + KEYSTART, keystart);
// 写入value值在buffer中的起始位置
kvmeta.put(kvindex + VALSTART, valstart);
// 写入value值的长度
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));


环形缓存区的大小是有限的,当环形缓存区使用量达到阈值(默认80%)时,会将环形缓存区中的数据溢写到磁盘里(不是HDFS)。
org.apache.hadoop.mapred.MapTask.MapOutputBuffer#sortAndSpill
每次溢写都会生成类似的文件:sipll0.out ,文件里的数据是按照partition划分好的,每个partition中的数据都是已排序的。

当Map阶段数据处理全部完成,因为还有数据保存在环形缓冲区中,所有在结束的时候必定会产生一次溢写,将数据写入磁盘。
output.close(mapperContext);
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
由于最终的溢写文件>=1个,所以最后还需要把所有的溢写文件合并成一个。
// org.apache.hadoop.mapred.MapTask.MapOutputBuffer#flush
mergeParts();

最终Map阶段会生成一个按分区划分,分区内已排序的数据文件,然后将数据文件交给Shuffle HTTP Server。
8、完成以上操作后,Map进程向MRAppMaster进程汇报任务结束,然后退出进程。
Map阶段生成的数据文件交给了谁管理?
交给了Shuffle HTTP Server服务来管理。这个服务是由NodeManager启动的。使用的是Yarn的AuxServices机制,NodeManager允许用户通过配置附属服务的方式扩展自己的功能,这使得每个节点可以定制一些特定框架需要的服务。附属服务需要在NM启动前配置好,并由NM统一启动和关闭。典型的应用是MapReduce框架中用到的Shuffle HTTP Server,其通过封装成一个附属服务由各个NodeManager启动。后续Reduce阶段也是从Shuffle HTTP Server来拉取数据的。
网友评论