目标
该文档作为一份个人指导全面性得描述了所有用户使用Hadoop Mapreduce框架时遇到的方方面面。
准备条件
确保Hadoop安装、配置和运行。更多细节:
初次使用用户配置单节点。
配置大型、分布式集群。
综述
Hadoop Mapreduce是一个易于编程并且能在大型集群(上千节点)快速地并行得处理大量数据的软件框架,以可靠,容错的方式部署在商用机器上。
MapReduce Job通常将独立大块数据切片以完全并行的方式在map任务中处理。该框架对maps输出的做为reduce输入的数据进行排序,Job的输入输出都是存储在文件系统中。该框架调度任务、监控任务和重启失效的任务。
一般来说计算节点和存储节点都是同样的设置,MapReduce框架和HDFS运行在同组节点。这样的设定使得MapReduce框架能够以更高的带宽来执行任务,当数据已经在节点上时。
MapReduce 框架包含一个主ResourceManager,每个集群节点都有一个从NodeManager和每个应用都有一个MRAppMaster。
应用最少必须指定输入和输出的路径并且通过实现合适的接口或者抽象类来提供map和reduce功能。前面这部分内容和其他Job参数构成了Job的配置。
Hadoop 客户端提交Job和配置信息给ResourceManger,它将负责把配置信息分配给从属节点,调度任务并且监控它们,把状态信息和诊断信息传输给客户端。
尽管 MapReduce 框架是用Java实现的,但是 MapReduce 应用却不一定要用Java编写。
Hadoop Streaming 是一个工具允许用户创建和运行任何可执行文件。
Hadoop Pipes 是兼容SWIG用来实现 MapReduce 应用的C++ API(不是基于JNI).
输入和输出
MapReduce 框架只操作键值对,MapReduce 将job的不同类型输入当做键值对来处理并且生成一组键值对作为输出。
Key和Value类必须通过实现Writable接口来实现序列化。此外,Key类必须实现WritableComparable 来使得排序更简单。
MapRedeuce job 的输入输出类型:
(input) ->map-> ->combine-> ->reduce-> (output)
MapReduce - 用户接口
这部分将展示 MapReduce 中面向用户方面的尽可能多的细节。这将会帮助用户更小粒度地实现、配置和调试它们的Job。然而,请在 Javadoc 中查看每个类和接口的综合用法,这里仅仅是作为一份指导。
让我们首先来看看Mapper和Reducer接口。应用通常只实现它们提供的map和reduce方法。
我们将会讨论其他接口包括Job、Partitioner、InputFormat和其他的。
最后,我们会讨论一些有用的特性像分布式缓存、隔离运行等。
有效负载
应用通常实现Mapper和Reducer接口提供map和reduce方法。这是Job的核心代码。
Mapper
Mappers将输入的键值对转换成中间键值对。
Maps是多个单独执行的任务将输入转换成中间记录。那些被转换的中间记录不一定要和输入的记录为相同类型。输入键值对可以在map后输出0或者更多的键值对。
MapReduce 会根据 InputFormat 切分成的各个 InputSplit 都创建一个map任务
总的来说,通过 job.setMapperClass(Class)来给Job设置Mapper实现类,并且将InputSplit输入到map方法进行处理。应用可复写cleanup方法来执行任何需要回收清除的操作。
输出键值对不一定要和输入键值对为相同的类型。一个键值对输入可以输出0至多个不等的键值对。输出键值对将通过context.write(WritableComparable,Writable)方法进行缓存。
应用可以通过Counter进行统计。
所有的中间值都会按照Key进行排序,然后传输给一个特定的Reducer做最后确定的输出。用户可以通过Job.setGroupingComparatorClass(Class)来控制分组规则。
Mapper输出会被排序并且分区到每一个Reducer。分区数和Reduce的数目是一致的。用户可以通过实现一个自定义的Partitioner来控制哪个key对应哪个Reducer。
用户可以随意指定一个combiner,Job.setCombinerClass(Class),来执行局部输出数据的整合,将有效地降低Mapper和Reducer之间的数据传输量。
那些经过排序的中间记录通常会以(key-len, key, value-len, value)的简单格式储存。应用可以通过配置来决定是否需要和怎样压缩数据和选择压缩方式。
How Many Maps?
maps的数据通常依赖于输入数据的总长度,也就是,输入文档的总block数。
每个节点map的正常并行度应该在10-100之间,尽管每个cpu已经设置的上限值为300。任务的配置会花费一些时间,最少需要花费一分钟来启动运行。
因此,如果你有10TB的数据输入和定义blocksize为128M,那么你将需要82000 maps,除非通过Configuration.set(MRJobConfig.NUM_MAPS, int)(设置一个默认值通知框架)来设置更高的值。
下面是原文
Purpose
This document comprehensively describes all user-facing facets of the Hadoop MapReduce framework and serves as a tutorial.
Prerequisites
Ensure that Hadoop is installed, configured and is running. More details:
Single Node Setupfor first-time users.
Cluster Setupfor large, distributed clusters.
Overview
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
A MapReducejobusually splits the input data-set into independent chunks which are processed by themap tasksin a completely parallel manner.The framework sorts the outputs of the maps, which are then input to thereducetasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the HadoopDistributed File System (seeHDFSArchitecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very highaggregate bandwidthacross the cluster.
The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application (seeYARNArchitecture Guide).
Minimally, applications specify the input/output locations and supplymapandreducefunctions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters,comprise thejobconfiguration.
The Hadoopjob clientthen submits the job(jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java.
Hadoop Streamingis a utility which allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer.
Hadoop Pipesis aSWIG-compatible C++ API to implement MapReduce applications (non JNI based).
Inputs and Outputs
The MapReduce framework operates exclusively on value pairs, that is, the framework views the input to the job as a set of pairs and produces a set of pairs as the output of the job,conceivably of different types.
The key and value classes have to be serializable by the framework and hence need to implement theWritableinterface. Additionally, the key classes have to implement theWritableComparableinterface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) v1> ->map-> ->combine-> ->reduce-> (output)
MapReduce - User Interfaces
This section provides a reasonable amount of detail on every user-facing aspect of the MapReduce framework. This should help users implement, configure and tune their jobs in a fine-grained manner. However, please note that the java doc for each class/interface remains the most comprehensive documentation available; this is only meant to be a tutorial.
Let us first take the Mapper and Reducer interfaces. Applications typically implement them to provide the map and reduce methods.
We will then discuss other core interfaces including Job,Partitioner,InputFormat,OutputFormat, and others.
Finally, we will wrap up by discussing some useful features of the framework such as
the Distributed Cache,Isolation Runner etc.
Payload
Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the job.
Mapper
Mappermaps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks that transform input records into intermediate records.The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.
The Hadoop MapReduce framework spawns one map task for each InputSplit generated by the InputFormat for the job.
Overall,Mapper implementations are passed the Job for the job via theJob.setMapperClass(Class)method.
The framework then callsmap(WritableComparable,Writable, Context)for each key/value pair in
the InputSplit for that task. Applications can then override the cleanup(Context)method to perform any required cleanup.
Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable, Writable).
Applications can use the Counter to report its statistics.
All intermediate values associated with a given output key are subsequently(随后)grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator viaJob.setGroupingComparatorClass(Class).
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
Users can optionally specify a combiner, viaJob.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len,value) format. Applications can control if, and how, the intermediate outputs are to be compressed and theCompression Codecto be used via the Configuration.
How Many Maps?
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus,if you expect 10TB of input data and have a blocksize of 128 MB, you'll end up with 82,000 maps,unlessConfiguration.set(MRJobConfig.NUM_MAPS, int)(which only provides a hint to the framework) is used to set it even higher.
由于翻译能力不足所出现的错误,请多多指出和包涵
网友评论