MapReduce原理与特性
源自于Google的MapReduce论文
发表与于2014年12月
Hadoop MapReduce是Goole MapReduce的克隆版
是一个批处理计算框架
一个MapRecduce程序分为Map阶段和Reduce 阶段
MapReduce特性:
- 易于编程
- 良好的扩展性
- 高容错
- 适合海量数据离线处理
MapReduce 常用应用场景
- 数据统计,比如网站的PV、UV统计
- 搜索引擎索引
- 海量数据查找
- 复杂数据分析算法实现
聚类算法
分类算法
推荐算法
不擅长场景
- 实时计算
能够在毫秒或者秒级内返回结果 - 流式计算
输入数据集是静态的,不能动态变化
自身设计特点决定了数据源必须是静态的 - DAG计算
多个作业之间存在依赖关系,后一个应用程序输入为前一个应用程序输出
MapReduce 编程模型
MapReduce将作业的整个运行过程分为两个阶段Map阶 段和Reduce阶段
Map阶段由一定数量的Map Task组成
- 输入数据格式解析:InputFormat (文件切分,分片,整合为key-value)
- 输入数据处理: Mapper(对数据进行过滤,切分等)
- 数据分组:Partitioner(hash(key) 取余 ReduceTask,然后由指定的ReduceTask处理)
Reduce阶段由一定数量的Reduce Task组成
- 数据远程拷贝
- 数据按照key排序
- 数据处理:Reducer
- 数据输出格式:OutputFormat
注意: 粗体是可以编程实现修改的.
image.png
InputFormat
- 文件分片(inputSplit) 方法
处理跨行问题, - 将分片数据解析成key/value对
默认实现时textInputFormat
-TextInputFormat
Key是行在文件中的偏移量,Value是行内容
如果一行被截断,则读取下一个block的前几个字符
Split与Block
- Block
HDFS中最小数据处理单元
默认128MB - Split
MapReduce中最小的计算单元
默认与Block一一对应 - Block与Split
Split与Block的对应关系是任意的,可由用户控制
Partitioner
Partitioner决定了Map Task输出的每条数据交给哪个
Reduce Task处理
默认实现:HashPartitioner
- Hash(key) mod R
- R是Reduce Task数目
- 分区计算结果产生的分区号等于Reduce Task号
- 允许自定义分区
编程接口
Java编程接口组成
• 旧API:所在java包:org.apache.hadoop.mapred
• 新API:所在java包:org.apache.hadoop.mapreduce
新API具有更好的扩展性
两种编程接口只是暴露给用户的形式不同,内部执行
引擎是一样的
创建maven工程,导入pom.xml,编写wordcount
wordcount
package class3;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行数据,将输入的序列化数据转换成字符串
String line = value.toString();
//将一行数据按照分隔符拆分
String[] words = line.split("\t");
//遍历单词数据,输出单词<k,1>
for (String word : words) {
//需要序列化写出
context.write(new Text(word), new IntWritable(1));
}
}
}
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历一组数据,将key出现次数累加到count
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String jobName = args[0];
String inputPath = args[1];
String outputPath = args[2];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置作业名称
job.setJobName(jobName);
//设置主类
job.setJarByClass(WordCount.class);
//设置作业中使用的Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置Mapper阶段的输出key类型和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer阶段的输出key类型和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置job的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hadooptest</groupId>
<artifactId>hadooptest</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<executions>
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
打包生成 jar包
上传到服务器,然后编写input file--->test.txt,上传到hdfs.
aaa asdf
asdf
asfe
adsf
asdfaf
sadf sdf asf asd
helo hello hello hello
[hadoop@hadoop0 ~]$ hadoop fs -ls /wordcount/input/
Found 1 items
-rw-r--r-- 3 hadoop supergroup 72 2018-01-23 21:07 /wordcount/input/test.txt
执行jar
hadoop jar hadooptest-1.0-SNAPSHOT.jar class3.WordCount WordCount /wordcount/input/ /wordcount/output
(包名+类名,类名,输入路径,输出路径)
执行完成!查看output
[hadoop@hadoop0 ~]$ hadoop fs -ls /wordcount/output
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2018-01-23 21:09 /wordcount/output/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 76 2018-01-23 21:09 /wordcount/output/part-r-00000
hadoop fs -cat /wordcount/output/part-r-00000
aaa 1
adsf 1
asd 1
asdf 2
asdfaf 1
asf 1
asfe 1
hello 3
helo 1
sadf 1
sdf 1
注意:输出路径在提交之前要保证不存在,MapReduce运行完会 自动创建,如果存在输出路径在提交的时候会报文件夹存在的错 误提示
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运 行mr程序的客户端)
- Mapper的输入数据是KV对的形式
- Mapper的输出数据是KV对的形式
- Mapper中的业务逻辑写在map()方法中
- map()方法对每一个<K,V>调用一次
- Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对
- Reducer的业务逻辑写在reduce()方法中
- Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
- 用户自定义的Mapper和Reducer都要继承各自的父类
- 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必
要信息的job对象
网友评论