一、MapReduce概述
MapReduce概述二、wordCount入门MapReduce
wordcount入门MapReduce2.1 MapReduce编程模型之Map与Reduce阶段
Map与Reduce阶段MapReduce执行步骤
2.2 官网关于MapReduce执行步骤的描述
key:指的是起始位置的偏移量,value指的是对应行的值。key和value必须实现以下两个接口
执行过程
三、MapReduce核心概念
3.1、Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
HDFS:blocksize 是HDFS中最小的存储单元 128M
默认情况下:他们两是一一对应的,当然我们也可以手工设置他们之间的关系(不建议)
3.2、InputFormat:
将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
TextInputFormat: 处理文本格式的数据
几个reduceTask就有几个输出文件
四、MapReduce架构之1/2.x
1.x总结1
总结2
2.x
五、Java版本的wordCount功能实现
package com.imooc.hadoop.mapreduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Iterator;
/**
* Created by zhouzhouseela on 2018/4/2.
*/
//这里的Map/Reduce的每一个键值对都要定义好
public class WordCountApp {
// map是读取输入的文件,投入的的是每一行文本的偏移量:每一行文本的一个键值对,产出一个String:long的键值对,比如
// hadoop:1,linux:1这样的键值对。
public class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
//这里的key表示偏移量,value表示每一行字符串
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for(String word:words){
context.write(new Text(word),new LongWritable(1));
}
}
}
//reduce投入的是map的产出,然后产出的是hadoop:2这样的汇总结果(归并操作)
public class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
// 因为输入的都是hadoop:1,hadoop1,linux:1这样的,把单词对应的values加起来就得到其频次。
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Iterator<LongWritable> valueIt = values.iterator();
long sum =0;
while(valueIt.hasNext()){
sum+=valueIt.next().get();
}
context.write(key,new LongWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration configuration =new Configuration();
Job job =Job.getInstance(configuration,"wordCount");
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//设置map相关的参数
job.setMapOutputValueClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置reduce相关的参数
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
maven实现wc
解决MR过程中的一个bug
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(new Path(args[1]))){
fileSystem.delete(new Path(args[1]),true);
}
六、MapReduce编程之Combiner:相当于在本地进行reduce。
Combiner6.1 使用场景
- 适合求和、次数等
- 对求平均的场景不适合
七、MapReduce编程之Partitioner
Partitioner编程分发的数据按照不同的分类来处理并且输出到不同的文件
一个栗子
- 假设有如下的文件,要求将相同类型的收入放在一个reduce上处理
[图片上传失败...(image-eca267-1522660460404)] - 代码如下
···
package com.imooc.hadoop.mapreduce;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Iterator;
//这里的Map/Reduce的每一个键值对都要定义好
public class PartitionerDemo {
// map是读取输入的文件,投入的的是每一行文本的偏移量:每一行文本的一个键值对,产出一个String:long的键值对,比如
// hadoop:1,linux:1这样的键值对。
public class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
//这里的key表示偏移量,value表示每一行字符串
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
// 对于如上的格式,每一行中的第一个是名称,第二个是价格
for(String word:words){
context.write(new Text(words[0]),new LongWritable(Long.parseLong(words[1])));
}
}
}
//reduce投入的是map的产出,然后产出的是hadoop:2这样的汇总结果(归并操作)
public class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
// 因为输入的都是hadoop:1,hadoop1,linux:1这样的,把单词对应的values加起来就得到其频次。
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
Iterator<LongWritable> valueIt = values.iterator();
long sum =0;
while(valueIt.hasNext()){
sum+=valueIt.next().get();
}
context.write(key,new LongWritable(sum));
}
}
// 定义Partition的Class
public class MyPartitioner extends Partitioner<Text,LongWritable>{
public int getPartition(Text key, LongWritable value, int i) {
if(key.equals("xiaomi")){
return 0;
}
if(key.equals("hauwei")){
return 1;
}
if(key.equals("iphone7")){
return 2;
}
return 3;
}
}
public static void main(String[] args) throws Exception {
Configuration configuration =new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(new Path(args[1]))){
fileSystem.delete(new Path(args[1]),true);
}
Job job =Job.getInstance(configuration,"wordCount");
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//设置map相关的参数
job.setMapOutputValueClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置reduce相关的参数
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置job的partition
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(4);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
···
网友评论