MapReduce
一、概述
1). 特点
- 优点:海量数据离线处理&易开发&易运行
- 不适用场景:实时流计算
2). 编程模型案例简单介绍
分布式计算框架MapReduce.png
以上是一个词频统计案例,我们需要实现的就是Mapping和Reducing阶段
3). MapReduce的过程
- 将作业拆分成Map阶段和Reduce阶段
- Map阶段:Map Tasks
- Reduce阶段:Reduce Tasks
4). MapReduce编程模型的执行步骤
- 准备map处理输入数据
- Mapper处理
- Shuffle
- Reduce处理
- 结果输出
MapReduce执行步骤.png
InputFormat类 需要好好研究
4). MapReduce编程模型核心概念
- Split
- InputFormat
- OutputFormat
- Combiner
- Partitioner
二、单词统计案例(实战一)
统计一个文件中以空格或者其他分割的不区分大小写单词的个数
1). 实现Mapper接口
-
注意实现Mapper的包包名
-
抽象类中的泛型
- KEYIN: Map任务读数据的key类型,offset,是每行数据起始位置的偏移量,一般为Long
- VALUEIN: Map任务读数据的value类型,其实就是一行行的字符串,例如String
- KEYOUT: map方法自定义实现输出的key的类型,例如String
- VALUEOUT: map方法自定义实现输出的value的类型,例如Integer
-
Hadoop分布式文件系统,数据交互经过网络,必然涉及到【序列化】【反序列化】,它有自己的数据类型支持处理这些要求(实现对应的接口org.apache.hadoop.io.Writable),框架会调用接口中的write方法进行序列化对象写入、readFields方法进行对象读取(要保持写入和读取的顺序一致)
-
案例:词频统计:相同单词的次数(word, 1) 注意,这里不需要计算!!只是中间转换!!
-
Long,String,Integer是Java里面的数据类型,Hadoop中可以对应使用自带的LongWritable,Text,IntWritable(当然可以自己实现对应的接口)
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author Liucheng
* @since 2019-11-12
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/** 【模板设计模式】
* 重写这个方法,进行中间转换并放入【上下文环境】
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 把value对应的行数据按照指定的分隔符拆开
final String[] words = value.toString().split("[^a-zA-Z0-9]+");
for (String word : words) {
// (hello, 1) (word, 1)
// 不区分大小写!
// 注意,这里只是转换,不计算;
context.write(new Text(word.toLowerCase()), new IntWritable(1));
}
}
}
2). 实现Reducer接口
注意:map中的values迭代器每次迭代的对象是同一个,每次迭代此对象封装了下一个数据,所以读取出来的数据必须再次封装然后交给其他逻辑再处理,否则最后拿到的内容为迭代器迭代的最后一个值!
- 同样注意包名
- 实现的泛型
- KEYIN: 对应Mapper或者Combiner输出的key
- VALUEIN: 对应Mapper或者Combiner输出的value
- KEYOUT: 处理结果key
- VALUEOUT: 处理结果value
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* @author Liucheng
* @since 2019-11-12
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
/** 【模板设计模式】
* map的输出到reduce端,是按照相同的key分发到一个reduce上去执行
* 此时的数据关系类似以下的举例 key values
* (hello,1)(hello,1)(hello,1) ==> reduce1: (hello, <1,1,1>)
* (world,1)(world,1)(world,1) ==> reduce2: (world, <1,1,1>)
* (welcome,1) ==> reduce3: (welcome, <1>)
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
// 不管是迭代器还是forEach,如果读取的为封装的对象,必须再次封装然后使用,应为迭代器迭代的对象永远只有一个,只是内容变化了而已!
Iterator<IntWritable> iterator = values.iterator();
// <1,1,1>
while (iterator.hasNext()) {
IntWritable value = iterator.next();
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
3). 客户端测试
- 本地文件系统测试
import com.hahadasheng.bigdata.hadooplearning.utils.FileUtilsLocal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 本地任务测试:
* 【强烈推荐】在本地测试,方便调试代码,代码没得问题后再打包丢到服务器进行调优!
* @author Liucheng
* @since 2019-11-14
*/
public class WordCountLocalApp {
public static void main(String[] args) throws Exception {
// 系统配置
Configuration configuration = new Configuration();
// 创建一个Job
Job job = Job.getInstance(configuration);
// 设置Job对应的参数:主类
job.setJarByClass(WordCountLocalApp.class);
// 设置Job对应的参数:设置自定义的Mapper和Reducer处理类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Job对应参数:Mapper输出的key和value类型(泛型中后面两个的类型)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Job对应的参数:Reduce输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
String pathIn = "E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\localtest\\wc.txt";
String pathOut = "E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\localtest\\count";
// 本地递归删除文件夹(代码略)
FileUtilsLocal.removeFileRecursion(pathOut);
// 设置Job对应的参数:作业输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(pathIn));
FileOutputFormat.setOutputPath(job, new Path(pathOut));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : -1);
}
}
- 本地程序,hdfs文件系统测试
package com.hahadasheng.bigdata.hadooplearning.mapreducerlearning.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
/**
* 使用MR统计HDFS上的文件对应的词频
* Driver: 配置Mapper, Reducer的相关属性
* 提交到本地运行:开发过程使用
* Win环境下的注意事项:(针对2.6.0-cdh5.15.1版本); 否则启动会报错
* java.lang.UnsatisfiedLinkError:
* org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V
* 或者其他错误!!!!
*
* 1. 在 https://github.com/steveloughran/winutils 下的 hadoop-3.0.0 / bin中
* 将hadoop.dll和winutils.exe下来放入本地,例如 D:\hadoop\bin
* 2. 配置系统环境变量
* HADOOP_HOME = D:\hadoop
* Path 添加 %HADOOP_HOME%\bin
*
* @author Liucheng
* @since 2019-11-13
*/
public class WordCountApp {
public static void main(String[] args) throws Exception {
// 设置系统“环境变量”; 用于hadoop程序读取配置
System.setProperty("HADOOP_USER_NAME", "hadoop");
// 系统配置
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.10.188:8020");
// 创建一个Job
Job job = Job.getInstance(configuration);
// 设置Job对应的参数:主类
job.setJarByClass(WordCountApp.class);
// 设置Job对应的参数:设置自定义的Mapper和Reducer处理类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Job对应参数:Mapper输出的key和value类型(泛型中后面两个的类型)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Job对应的参数:Reduce输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 获取hdfs句柄:如果目录已经存在,则先删除,否则会报错!
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.10.188:8020"), configuration, "hadoop");
Path outputPath = new Path("/wordcount/output");
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 设置Job对应的参数:作业输入和输出的路径
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, outputPath);
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : -1);
}
}
4). Combiner操作
Combiner操作.png
- Combiner肯定是用了反射的技术
- Combiner操作之前应该也执行了类似shuffle的操作,拿到转换成key以及对应的value迭代器,处理后交给上下文,然后再shuffle,转换成key以及对应的value迭代器丢给下一个Reducer处理
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
/**
* @author Liucheng
* @since 2019-11-14
*/
public class WordCountCombinerApp {
public static void main(String[] args) throws Exception {
// 设置系统“环境变量”; 用于hadoop程序读取配置
System.setProperty("HADOOP_USER_NAME", "hadoop");
// 系统配置
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.10.188:8020");
// 创建一个Job
Job job = Job.getInstance(configuration);
// 设置Job对应的参数:主类
job.setJarByClass(WordCountCombinerApp.class);
// 添加Combiner的设置@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@2
job.setCombinerClass(WordCountReducer.class);
// 设置Job对应的参数:设置自定义的Mapper和Reducer处理类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Job对应参数:Mapper输出的key和value类型(泛型中后面两个的类型)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Job对应的参数:Reduce输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 获取hdfs句柄:如果目录已经存在,则先删除,否则会报错!
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.10.188:8020"), configuration, "hadoop");
Path outputPath = new Path("/wordcount/output");
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 设置Job对应的参数:作业输入和输出的路径
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, outputPath);
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : -1);
}
}
网友评论