一、MapReduce编程指导思想
MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤。
1. Map阶段2个步骤
-
第一步:设置inputFormat类,将数据切分成key,value对,输入到第二步
-
第二步:自定义map逻辑,处理我们第一步的输入kv对数据,然后转换成新的key,value对进行输出
2. shuffle阶段4个步骤
-
第三步:对上一步输出的key,value对进行分区。(相同key的kv对属于同一分区)
-
第四步:对每个分区的数据按照key进行排序
-
第五步:对分区中的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
-
第六步:对排序后的kv对数据进行分组;分组的过程中,key相同的kv对为一组;将同一组的kv对的所有value放到一个集合当中(每组数据调用一次reduce方法)
3. reduce阶段2个步骤
-
第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出
-
第八步:设置将输出的key,value对数据保存到文件中
二、MapReduce编程实现
1. 源数据准备
hello,hello
world,world
hadoop,hadoop
hello,world
hello,flume
hadoop,hive
hive,kafka
flume,storm
hive,oozie
2. 代码实现
创建maven工程导入jar包
<properties>
<hadoop.version>3.1.4</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
定义WordCountMain主类
固定写法,实现mapreduce8个步骤:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//定义MapReduce的8个步骤 然后通过Job组装
//继承父类 Configured 获取conf
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, "WordCount");
//如果需要集群运行,需要设置程序运行主类;若本地运行,则不需要该设置
job.setJarByClass(WordCountMain.class);
//通过Job组装MpaReduce的8个步骤
//第一步:读取文件,解析成为key,value对(k1,v1)
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\input\\数据文件"));
//第二步:自定义map逻辑,接受(k1,v1) 转换成为新的 (k2,v2)
job.setMapperClass(MyMapper.class);
//设置map的输出的key,value对象(k2,v2)的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
/**
* 第三步:分区
* 第四步:排序
* 第五步:规约
* 第六步:分组
*/
//第七步:自定义reduce逻辑
job.setReducerClass(MyReduce.class);
//设置reduce的输出key,value对象(k3,v3)的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//第八步:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\output"));
//提交整个job任务,等到整个任务结束之后,如果任务执行成功,返回true;如果执行失败,返回false
boolean b = job.waitForCompletion(true);
//使用三余运算符
return b?0:1;
}
/**
* MapReduce进行WordCount的入口类
*/
public static void main(String[] args) throws Exception {
//获取配置文件
Configuration configuration = new Configuration();
//通过ToolRunner.run 执行程序的入口
int run = ToolRunner.run(configuration, new WordCountMain(), args);
//整个系统的退出
System.exit(run);
}
}
定义Mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 使用TextInputFormat读取文件,一行一行进行读取
* key:行偏移量 一般为LongWritable
* value: 一行文本记录为 Text
* 每个单词出现一次,记做一次
* key:单词 Text
* value: 1 IntWritable
*/
public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
//准备好输出的: <Text,IntWritable>
Text text = new Text();
IntWritable intWritable = new IntWritable(1);
/**
*
* @param key
* @param value 一行文本记录
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Text value: 对文件一行记录进行处理
String line = value.toString();
String[] split = line.split(",");
for (String word : split){
//context 为 <Text,IntWritable>
text.set(word);
context.write(text,intWritable);
}
}
}
定义Reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReduce extends Reducer<Text, IntWritable, Text,IntWritable> {
/**
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int result = 0;
for(IntWritable value:values){
int i = value.get();
result += i;
}
IntWritable intWritable = new IntWritable(result);
context.write(key,intWritable);
}
}
注意: 切记导入正确的jar包,开发中由于导入了错误的Text包,导致返回code1,也没有报错,切记切记。
网友评论