目录
- MapReduce概述
- MapReduce编程模型
2.1 案例分析:WordCount => 词频统计
2.2 执行步骤
- MapReduce编程模型
- MapReduce架构
3.1 MapReduce1.x的架构
3.2 MapReduce2.x架构
- MapReduce架构
- MapReduce编程
4.1 核心概念
4.2 编程实现WordCount
4.2.1 WC开发的流程
4.2.2 在Hadoop上运行自己编写的WC代码
4.2.3 存在的问题
4.2.4 解决办法
4.3 MapReduce编程之Combiner
4.4 MapReduce编程之Partitioner
- MapReduce编程
- JobHistory配置与启动停止
5.1 文件配置
5.2 启动与停止
- JobHistory配置与启动停止
- 基础代码
1. MapReduce概述
- 源于Google的MapReduce论文,论文发表于2004年12月
- Hadoop MapReduce是Google MapReduce的克隆版本
- MapReduce优点:适合海量数据离线处理&易开发&易运行(但与Spark相比,MapReduce并没有 易开发和易运行的优点)
- MapReduce缺点:不能进行实时流式计算
- 官网的概述:Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
2. MapReduce编程模型
2.1 案例分析:WordCount => 词频统计
(统计文件中每个单词出现的次数,工作场景中的很多开发都是在WordCount的基础上进行改造的,因此深入理解该案例很重要.)
- MapReduce主要的计算过程是采用 分而治之 的思想,将一个作业分为Map阶段和Reduce阶段。
WordCount在MapReduce上的处理流程 在Map和Reduce阶段: - 将一个作业拆分成Map阶段和Reduce阶段
- Map阶段:对应多个Map Tasks
- Reduce阶段:对应多个Reduce Tasks
2.2 执行步骤
-
官网的描述:The MapReduce framework operates exclusively on <key, value>pairs, that is, the framework views the input to the job as a set of<key, value>pairs and produces a set of <key, value>pairs as the output of the job, conceivably of different types.
The key and valueclasses have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework. -
Input and Output types of a MapReduce job:
(input) <k1, v1> -->map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) -
整个MapReduce过程的数据格式都是键值对(<k1, v1>),针对WordCount案例:
<k1, v1> :k1就是文件的偏移量,v1就是行的字符串内容;
<k2, v2>:K2就是每个单词,v2是该单词k2出现的次数;
<k3, v3>:k3和k2一样,v3是同一个k2对应的多个v2累加的结果。 -
作业处理过程:
MapReduce作业处理过程
3. MapReduce架构
3.1 MapReduce1.x的架构
图片.png 1) JobTracker:JT (管理者)- 作业的管理者;将作业分解成一堆的任务:Task(MapTask和ReduceTask);
- 将作业分派给TaskTracker运行;
- 作业的监控、容错处理(task作业挂了,重启task机制);
- 在一定的时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT运行的任务会被指派到其他TT上去执行.
2)TaskTracker:TT (执行者) - 任务的执行者 ;
- 在TT上执行Task(MapTask和ReduceTask);
- 与JT进行交互:执行启动停止作业,发送心跳信息给JT
3)MapTask:MT - 将自己开发的map任务交由Task处理;
- 解析每条记录的数据,交给自己的map方法处理;
- 将map的输出结果写到本地磁盘(有些作业仅有map没有reduce==>HDFS).
4) ReduceTask: RT - 将Map Task输出的数据进行读取;
- 安装数据进行分析传给自己编写的reduce方法处理;
- 输出结果到HDFS.
3.2 MapReduce2.x架构
MapReduce2.x架构在 《十小时入门大数据》学习笔记之Hadoop核心组件YARN有详细的介绍,这里就不再赘述。
4. MapReduce编程
4.1 核心概念
- Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
- blocksize : 是HDFS的最小的存储单元, 为128M
默认情况下:Split和blocksize的大小是一一对应的,也可以手工设置他们之间的关系(但不建议这么做). - InputFormat:将输入的数据进行分片(split);
- TextInputFormat:处理文本格式的数据;
4.2 编程实现WordCount
- 环境:Ubuntu16.04+IDEA2017.3+Maven3.6
- hadoop版本:hadoop-2.6.0-cdh5.7.0
- 具体的环境搭建可参考另一篇笔记《十小时入门大数据》学习笔记之HDFS中的4.HDFS环境搭建 和 6.Java API 操作HDFS
4.2.1 WC开发的流程
一共分为三步:
1)Map:读取输入的文件
2)Reduce:进行归并
3)Main:定义Driver,封装了MapReduce作业的所有信息
4.2.2 在Hadoop上运行自己编写的WC代码
1)利用maven进行编译打包:进入到项目的路径下,在终端输入下面代码
mvn clean package -DskipTests
出现HUILD SUCCES表示编译成功
图片.png
2)上传到服务器:同样在终端输入下面命令
scp target/hadoop-train-1.0.jar hadoop@dell:~/data
3)运行:在服务器终端输入下面命令,其中hdfs://localhost:8020/hello.txt是文件的输入路径,hdfs://localhost:8020/output/wc是输出路径。
hadoop jar /home/hadoop/data/hadoop-train-1.0.jar com.lyh.hadoop.mapreduce.WordCountApp hdfs://localhost:8020/hello.txt hdfs://localhost:8020/output/wc
打开浏览器:http://localhost:8088 ,能够看到有一个我们的wordcount任务正在运行
图片.png 运行结束后,查看hdfs的/output/wc/目录,我们能够看到多出两个文件 图片.png 查看part-r-00000文件,可以看到我们的计算结果 图片.png4.2.3 存在的问题
-
相同的代码和脚本再次执行,会报错
图片.png -
在MR中,输出文件是不能事先存在的
4.2.4 解决办法
- 1)先手工通过shell的方式将输出文件先删除,创建一个shell脚本文件: hadoop@Dell:~/data$ vim wc_mr.sh
把下面的代码输入到wc_mr.sh文件中
hadoop fs -rm -r /output/wc
hadoop jar /home/hadoop/data/hadoop-train-1.0.jar com.lyh.hadoop.mapreduce.WordCountApp hdfs://localhost:8020/hello.txt hdfs://localhost:8020/output/wc
图片.png
保存,给wc_mr.sh添加执行权限:chmod u+x wc_mr.sh
- 2)在代码中完成自动删除功能(推荐使用这种方法)
在main中添加以下代码即可
// 准备清理已存在的输出目录
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath, true);
System.out.println("output file exists, but is has deleted");
}
图片.png
4.3 MapReduce编程之Combiner
- 相当于本地的reducer
-
减少Map Task输出的数据量及数据网络传输量
图片.png - 开发案例
对于WC案例,只需要在原来的基础上添加一句代码即可
//通过job设置combiner处理类,其实逻辑上和我们的reduce是一模一样的
job.setCombinerClass(MyReducer.class);
图片.png
适用场景:求和 次数等
不适用的场景:求平均数
4.4 MapReduce编程之Partitioner
- Partitioner决定Map Task 输出的数据交由哪个Reduce Task处理
- 默认实现:分发的key的hash值对Reduce Task个数取模
- 开发案例
需求:有一份手机销售量数据,如下:
xiaomi 100
huawei 600
iphone 200
xiaomi 700
xiaomi 200
huawei 800
要求统计各类手机的总销售量,分别存储在不同的文件中
- 实现代码
略(见代码中的ParititonerApp类)
5 JobHistory配置与启动停止
- 记录已经运行完的MapReduce信息到指定的HDFS目录下
- 默认没有开启的
5.1 文件配置
1)在yarn-site.xml中添加
<!-- 开启日志聚合 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
2)在mapred-site.xml中添加
<!-- 设置jobhistoryserver 没有配置的话 history入口不可用 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>localhost:10020</value>
</property>
<!-- 配置web端口 -->
<property> <name>mapreduce.jobhistory.webapp.address</name>
<value>localhost:19888</value>
</property>
<!-- 配置正在运行中的日志在hdfs上的存放路径 -->
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>
<!-- 配置运行过的日志存放在hdfs上的存放路径 -->
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>
5.2 启动与停止
1)启动: 在hadoop/sbin/目录下执行
./mr-jobhistory-daemon.sh start historyserver
2)停止:在hadoop/sbin/目录下执行
./mr-jobhistory-daemon.sh stop historyserver
6.基础代码
完整的项目见:https://download.csdn.net/download/qq_29557137/10862010
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 使用MapReduce开发WordCount应用程序
*/
public class WordCountApp {
/**
* Map:读取输入的文件,注意这里的Mapper<LongWritable, Text, Text, LongWritable>类型需要自己定义,因为我们的第一个<k1,v1>是文件的偏移量和字符串,因此k1的类型设置为LongWritable,Text,输出的是单词和单词出现的次数,因此<k2,v2>的类型设置为Text 和 LongWritable
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 接收到的每一行数据
String line = value.toString();
//按照指定分隔符进行拆分
String[] words = line.split(" ");
for(String word : words) {
// 通过上下文把map的处理结果输出
context.write(new Text(word), one);
}
}
}
/**
* Reduce:归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for(LongWritable value : values) {
// 求key出现的次数总和
sum += value.get();
}
// 最终统计结果的输出
context.write(key, new LongWritable(sum));
}
}
/**
* 定义Driver:封装了MapReduce作业的所有信息
*/
public static void main(String[] args) throws Exception{
//创建Configuration
Configuration configuration = new Configuration();
//创建Job
Job job = Job.getInstance(configuration, "wordcount");
//设置job的处理类
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置map相关参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
网友评论