2.1 使用Hadoop分析数据
2.1.1 map和reduce
MapReduce包含map和reduce两个阶段,每阶段输入输出都为key-value
以下示例为计算每个月最高温度
1. 输入
温度原始数据input.txt
,格式为日期,...,温度
2018-01-02,...,12
2018-01-03,...,8
2018-01-04,...,
2018-01-05,...,30
2018-02-01,...,8
2018-02-02,...,15
2018-02-03,...,12
2018-02-04,...,24
2.map
package com.zyf.study;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] values = line.split(",");
if (values.length > 2) {
int temperature = Integer.parseInt(values[2]);
context.write(new Text(values[0].substring(0, 7)), new IntWritable(temperature));
}
}
}
- map输入:key=行号,value=行内容,如key=1,value=2018-01-02,...,12
- map输出:key=年月,value=温度,如key=2018-01,value=12
3.reduce
package com.zyf.study;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {
int t = it.next().get();
if (t > maxValue) {
maxValue = t;
}
}
context.write(key, new IntWritable(maxValue));
}
}
- reduce输入:key=年月,value=同一年月的温度数组,如key=2018-01,value=[12,8,30]
- reduce输出:key=年月,value=温度,如key=2018-01,value=30
4. Job启动类
package com.zyf.study;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxTemperature {
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length < 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = Job.getInstance();
job.setJobName("MaxTemperature");
job.setJarByClass(MaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
-
FileInputFormat.addInputPath
为任务指定输入数据目录,可以是单个文件、目录、符合文件模式的一系列文件,FileInputFormat.addInputPath
可传入用逗号隔开的多个路径; -
FileOutputFormat.setOutputPath
为任务指定输出目录,若指定输出目录已存在,任务将会报错,以防覆盖数据; -
setMapperClass setReducerClass
指定map类和reduce类,setCombinerClass
指定合并类,一般与reduce相同,该类可在map计算节点预先合并输出,以减少数据网络传输; -
setOutputKeyClass setOutputValueClass
指定reduce输出结果的数据类型,map输出结果数据类型一般与reduce输入相同,不需要指定,若不同,需要通过setMapOutputKeyClass
、setMapOutputValueClass
指定; -
job.waitForCompletion(verbose)
入参标识是否打印job执行详细日志,返回值标识job执行成(true)败(false)。
5.执行日志
hadoop jar hadoop-first-1.0-SNAPSHOT.jar D:\temp\hadoop-testdata D:\temp\hadoop-testdata\output
截取部分日志如下:
- JOB标识
19/04/12 20:07:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local281339793_0001
19/04/12 20:07:58 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
19/04/12 20:07:58 INFO mapreduce.Job: Running job: job_local281339793_0001
- MAP任务部分日志
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Waiting for map tasks
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Starting task: attempt_local281339793_0001_m_000000_0
19/04/12 20:07:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/12 20:07:59 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
19/04/12 20:07:59 INFO mapred.Task: Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@6a58318a
19/04/12 20:07:59 INFO mapred.MapTask: Processing split: file:/D:/temp/hadoop-testdata/input.txt:0+146
.
.
19/04/12 20:07:59 INFO mapred.Task: Task:attempt_local281339793_0001_m_000000_0 is done. And is in the process of committing
19/04/12 20:07:59 INFO mapred.LocalJobRunner: map
19/04/12 20:07:59 INFO mapred.Task: Task 'attempt_local281339793_0001_m_000000_0' done.
19/04/12 20:07:59 INFO mapred.Task: Final Counters for attempt_local281339793_0001_m_000000_0: Counters: 18
File System Counters
FILE: Number of bytes read=6900
FILE: Number of bytes written=305142
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=8
Map output records=7
Map output bytes=84
Map output materialized bytes=34
Input split bytes=104
Combine input records=7
Combine output records=2
Spilled Records=2
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=0
Total committed heap usage (bytes)=257425408
File Input Format Counters
Bytes Read=146
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Finishing task: attempt_local281339793_0001_m_000000_0
- Reduce任务部分日志
19/04/12 20:08:00 INFO mapred.Task: Task 'attempt_local281339793_0001_r_000000_0' done.
19/04/12 20:08:00 INFO mapred.Task: Final Counters for attempt_local281339793_0001_r_000000_0: Counters: 24
File System Counters
FILE: Number of bytes read=7000
FILE: Number of bytes written=305210
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=34
Reduce input records=2
Reduce output records=2
Spilled Records=2
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=257425408
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Output Format Counters
Bytes Written=34
19/04/12 20:08:00 INFO mapred.LocalJobRunner: Finishing task: attempt_local281339793_0001_r_000000_0
通过上述日志也可以看到,combiner将原先的7条map输出,合并为2条
6.执行结果
结果文件保存于output目录
._SUCCESS.crc
.part-r-00000.crc
_SUCCESS
part-r-00000
part-r-00000
文件内容如下:
2018-01 30
2018-02 24
2.2 横向扩展
为了实现横向扩展,需要将数据存储在分布式文件系统,YARN分发MapReduce计算至数据存储节点;
2.2.1 数据流
- JOB
包括输入流、MapReduce函数、配置信息
分成若干Map、Reduce任务,任务由YARN调度在集群运行,任务失败将在其他节点自动重新调度运行;
- 输入流
- 数据划分成等长小数据块,每个分片创建一个对应map任务,以实现更好的负载均衡,处理性能更高且空闲的机器,能处理的分片也更多,且map任务失败重试,单任务需要重新计算的数据量也会更少,分片越细,负载均衡质量越高,不过若切得太细,管理分片的时间越长,一般合理大小趋于HDFS块大小,默认128M.若分片跨越两个数据块,单HDFS节点不大可能同时存在这两个数据块,会存在网络传输问题。
- 数据本地优化(data locality optimization),map任务调度至输入数据存储节点上执行,无须网络传输,以获得最佳性能,若数据副本所在节点非空闲,map任务将被调度至输入数据节点所在机架其他空闲map slot执行,非常少情况下,任务被调度至其他机架,这将导致机架间网络传输。
- map输出流
本地数据、本地机架、跨机架 map任务
- map任务结果写入到本地磁盘,该结果作为reduce输入,Job完成,该结果被删除,无须存在HDFS备份,从而减少网络开销,若map输出传送给reduce之前失败,其他节点将重运行map任务以构建中间结果
- reduce输入与输出
一个reduce任务的MapReduce数据流.png
- 单个reduce的输入,是所有map的输出,排过序的map输出通过网络传输至reduce节点,不具备数据本地化优势;
- 多个map输入,在reduce节点先合并,再由reduce函数处理;
- reduce输出,存储于HDFS实现高可靠,第一个副本存储于当前节点,其他副本存储于同机架其他节点和其他机架随机节点;
多个reduce任务的MapReduce数据流.png 无reduce任务的MapReduce数据流.png
- reduce任务数需要指定,而非基于输入数据量;
- 多个reduce任务数据流如下,map任务根据reduce任务数量,创建对应数量的分区,将输出按分区函数放入对应的分区,默认partitioner通过哈希函数分区,也可自定义;
- map与reduce之间的数据流称为shuffle(混洗),调整混洗参数对任务总执行时间影响很大;
- 当数据处理可完全并行(即无需混洗),可能无需reduce,map将结果直接写入HDFS
2.2.2 Combiner函数
- 集群上可用带宽限制了MapReduce作业数量,最大化减少map和reduce间数据传输;
- 可为map任务指定一个combiner以合并每个map输出,Hadoop无法确定对一个指定map任务输出记录调用多少次combiner,不论调多少次,不影响reduce输出;
- 适用场景,如取最大值、最小值等,不适用场景包括取平均值等
1、
第一个map输出
(2018-01, 12)
(2018-01, 8)
(2018-01, 30)
第二个map输出
(2018-01, 16)
2、
reduce节点接收到上述4条记录,reduce调用时,输入合并为:
(2018-01, [12, 8, 30,16])
3、
若调用了combiner且其逻辑与reduce一样
reduce节点只会接收到2条记录,内容如下,
(2018-01, [30])
(2018-01, [16])
reduce调用时,输入合并为:
(2018-01, [30,16])
若取平均值,输出结果不符合预期
mean(12, 8, 30, 16) = 16.5
mean(mean(12, 8, 30), 16) = 16.3
2.2.3 运行分布式MapReduce作业
MapReduce可在本地开发调测,无须修改即可运行在集群,且集群可根据数据量水平扩展。
以上为《Hadoop权威指南》学习笔记
网友评论