MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发”基于hadoop的数据分析应用“的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
MapReduce优缺点
优点:
-
易于编程:用户只关心业务逻辑,实现框架的接口
-
良好扩展性:可以动态增加服务器,解决计算资源不够问题。
-
高容错性:任何一台机器挂掉,可以将任务转移到其他节点。
-
适合海量数据计算(TB、PB)几千台服务器共同计算。
缺点:
-
不擅长实时计算:MR擅长处理分钟、小时级别任务
-
不擅长流式计算:Sparkstreaming、flink(擅长流式计算)
-
不擅长DAG有向无环图(spark擅长)
MapReduce编程思想

1)MapReduce运行程序一般需要分成2个阶段:map阶段和reduce阶段。
2)Map阶段的并发Map Task,完全并行运行,互不相干。
3)Reduce结算的并发ReduceTask,完全互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三个实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调。
2)MapTask:负责Map阶段的整个数据处理流程。
3)ReduceTask:负责Reduce阶段的整个数据处理流程。
常用数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Integer | IntWritable |
Float | FloatWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | MapWritable |
Null | NullWritable |
MapReduce编程规范
用户编写的程序分为三个部分:Mapper、Reducer和Driver。
Map阶段
-
用户自定义的Mapper要继承自己的父类
-
Mapper的输入数据是KV对的形式(KV的类型可自定义)
-
Mapper中的业务逻辑写在map()方法中
-
Mapper的输出数据是KV对的形式(KV的类型可自定义)
-
map()方法(MapTask进程)对每一个<K,V>调用一次
Reducer阶段
-
用户自定义的Reducer要继承自己的父类
-
Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
-
Reducer的业务逻辑写在reduce()方法中
-
ReduceTask进程对每一组相同Key的<k,v>组调用一次reducer()方法
Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的Job对象
实战:统计文件中单词出现的次数(WordCount)

Mapper
-
将MapTask传的文本内容先转换为String
-
根据分隔符(如空格)将这一行切分成单词
-
将单词输出为<单词,1>
Reducer
-
汇总各个key的个数
-
输出该key的总次数
Driver
-
获取配置信息,获取job对象实例
-
指定本程序的jar包所在的本地路径
-
关联Mapper/Reducer业务类
-
指定Mapper输出数据的KV类型
-
指定最终输出的数据的KV类型
-
指定job的输入原始文件所在目录
-
指定job的输出结果所在目录,目录不能提前存在
-
提交作业
实战:编码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mapReduceDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
</project>
log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
注意:导包的包前缀《org.apache.hadoop》
WordCountMapper.java
package com.magw.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @Description mapper
* @Author tuzki
* @Date 2021/11/20 11:20 下午
* @Version 1.0
* KEYIN, map阶段输入的key的类型:LongWritable
* VALUEIN, map阶段输入value类型:text
* KEYOUT, map阶段输出的key的类型:Text
* VALUEOUT map阶段输出value类型:IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outV = new IntWritable(1);
@Override
public void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// 1 获取一行
String lineStr = value.toString();
// 2 切割
String[] words = lineStr.split(" ");
// 3 循环写出
for (String word : words) {
// 封装outKey
outKey.set(word);
// 写出
context.write(outKey, outV);
}
}
}
WordCountReducer.java
package com.magw.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @Description reducer
* @Author tuzki
* @Date 2021/11/20 11:20 下午
* @Version 1.0
*
* * KEYIN, reduce阶段输入的key的类型:Text
* * VALUEIN, reduce阶段输入value类型:IntWritable
* * KEYOUT, reduce阶段输出的key的类型:Text
* * VALUEOUT reduce阶段输出value类型:IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
// 累加
for (IntWritable value : values) {
sum += value.get();
}
IntWritable outV = new IntWritable(sum);
// 写出
context.write(key,outV);
}
}
WordCountDriver
package com.magw.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @Description Driver
* @Author tuzki
* @Date 2021/11/20 11:20 下午
* @Version 1.0
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
//1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3 关联mapper、reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4 设置mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5 设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("~/Documents/md/a.txt"));
FileOutputFormat.setOutputPath(job, new Path("~/Documents/md/wordCount"));
//7 提交job
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
线上运行wordCount
hadoop jar zzz.jar com.xxx.WordCounterDriver /input /output
注意先上运行时,代码中关于路径的设置需要修改,另外保证先上输出历经/output不存在。
public class WordCountDriver {
public static void main(String[] args) throws Exception {
//1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3 关联mapper、reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4 设置mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5 设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/wordcount"));
FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/wordcount"));
//7 提交job
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
小结:
本节介绍了MapReduce,了解mapreduce的编程思想,最后通过实战的方式完成了一个wordCount实例。
网友评论