MapReduce
1. Why MapReduce?
-
一台机器上处理数据耗时间太长
-
采用MapReduce分而治之
- To speed up the processing, we need to run parts of the program in parallel.
2. Map and Reduce
-
MapReduce works by breaking the processing into two phases: the map phase and the reduce phase.
(分为map阶段和reduce阶段) -
Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer.
(每个阶段输入输出均为键值对) -
The programmer also specifies two functions: the map function and the reduce function.
(可定义map和reduce功能函数)
map reduce example
-
统计每年的气温最高值
image -
分析
- The input to our map phase is the raw NCDC data.
- Our map function is simple. We pull out the year and the air temperature
- The
map function merely extracts the year and the air temperature - The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key.(排序)
- All the reduce function has to do now is iterate through the list and pick up the maximum reading
-
Code
Example : Mapper for the maximum temperature example
image
image-
Rather than using built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization.
-
These are found in the org.apache.hadoop.io package.
(Hadoop自己定义的数据类型在org.apache.hadoop.io包中,类型与Java中一一对应) -
Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer)
Example : Reducer for the maximum temperature example
image- The input types of the reduce function must match the
output types of the map function: Text and IntWritable.
Example: Application to find the maximum temperature in the weather dataset
image-
When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute around the cluster). Rather than explicitly specifying
the name of the JAR file, we can pass a class in the Job’s setJarByClass() method, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class. -
The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat. It specifies a directory where the output files
from the reduce function are written. The directory shouldn’t exist before running the job because Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with that of another). -
The setOutputKeyClass() and setOutputValueClass() methods control the output types for the reduce function, and must match what the Reduce class produces.
-
The waitForCompletion() method on Job submits the job and waits for it to finish. The single argument to the method is a flag indicating whether verbose output is generated. When true, the job writes information about its progress to the console.
-
Data Flow (重点)
-
A MapReduce job is a unit of work that the client wants to be
performed: it consists of the input data, the MapReduce program, and configuration information.
(MapReduce过程实际是人为定义的代码) -
Hadoop runs the job by dividing it into tasks, of which there are two types:map tasks and reduce tasks.
- The tasks are scheduled using YARN and run on nodes in the cluster. If a task fails, it will be automatically rescheduled to run on a different node.
- (hadoop内部执行机制:YARN进行资源调度,一个done掉,其他会自动执行)
-
input splits : Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits,or just splits.
- Hadoop creates one map task for each split, which runs the user-defined map function for each record in the split.
-
Hadoop does its best to run the map task on a node where the input data resides in HDFS, because it doesn’t use valuable cluster bandwidth. This is called the data locality optimization. (这一块不太能理解)
( 这段话的意思是:map的执行节点和数据的存储节点为同一节点时,hadoop性能达到最佳。 )
-
It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node.
- If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.
(split的大小和hadoop中block大小一样,这样确保hadoop性能达到最佳,因为如果split的大小正好为两个block时,这样没有一个节点可以同时存储split这样大小的数据,这时存在一个拉取数据的时间消耗.)
- If the split spanned two blocks, it would be unlikely that any HDFS node stored both blocks, so some of the split would have to be transferred across the network to the node running the map task, which is clearly less efficient than running the whole map task using local data.
-
Map tasks write their output to the local disk, not to HDFS.
- Why is this? Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete, the map output can be thrown away。
(map阶段的输出是写出到本地,而不是hdfs。因为他只是作为中间输出) - If the node running the map task fails before the map
output has been consumed by the reduce task, then Hadoop will automatically rerun the map task on another node to re-create the map output.
(如果某节点上map输出在传给reduce前崩溃,那么hadoop将在另一个节点上重新运行map任务在此创建map输出。)
- Why is this? Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete, the map output can be thrown away。
-
the input to a single reduce task is normally the output from all mappers.
- The output of the reduce is normally stored in HDFS for reliability.
- for each HDFS block of the reduce output, the first replica is stored on the local node, with other
replicas being stored on off-rack nodes for reliability.
(reduce的输出存放在HDFS中,第一备份在本地节点,其他的在HDFS中)
-
When there are multiple reducers, the map tasks partition their output, each creating one partition for each reduce task. There can be many keys (and their associated values) in each partition, but the records for any given key are all in a single partition.
image -
it’s also possible to have zero reduce tasks. This can be appropriate when you don’t need the shuffle because the processing can be carried out entirely in parallel.
image
Combiner Function
-
Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks.
-
Hadoop allows the user to specify a combiner function to be run on the map output, and the combiner function’s output forms the input to the reduce function.
-
Not all functions possess this property.1 For example, if we were calculating mean temperatures, we couldn’t use the mean as our combiner function.
-
The combiner function doesn’t replace the reduce function. (How could it? The reducefunction is still needed to process records with the same key from different maps.) But it can help cut down the amount of data shuffled between the mappers and the reducers.
image
(Conbiner并不能代替reducer,只是帮助减少map和reduce之间的数据传输量)
org.apache.hadoop.mapred.JobConf类
-
JobConf typically specifies the Mapper, combiner (if any), Partitioner, Reducer, InputFormat and OutputFormat implementations to be used etc.
-
Here is an example on how to configure a job via JobConf:
// Create a new JobConf JobConf job = new JobConf(new Configuration(), MyJob.class); // Specify various job-specific parameters job.setJobName("myjob"); FileInputFormat.setInputPaths(job, new Path("in")); FileOutputFormat.setOutputPath(job, new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setCombinerClass(MyJob.MyReducer.class); job.setReducerClass(MyJob.MyReducer.class); job.setInputFormat(SequenceFileInputFormat.class); job.setOutputFormat(SequenceFileOutputFormat.class);
网友评论