编写MR脚本的步骤
1. 添加MR脚本的依赖包
编写MR程序前,需要添加一些依赖的Jar包,主要分为四块:
- common
- hdfs
- mapreduce
- yarn
如果是maven项目,则需要在pom文件中添加如下依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn</artifactId>
<version>${hadoop.version}</version>
</dependency>
2. 重写Mapper类
Mapper类的参数包括四个泛型:
- KEYIN,输入key的泛型,如果类型是LongWritable,表示截止目前为止从文件读取字节数,如果不关注该键,可以用Object类型;
- VALUEIN,输入值的类型,其类型是Text表示从文本读入一行记录;
- KEYOUT,输出的key的类型;
- VALUEOUT,输出的value的类型。
序列化和反序列化
mapreduce处理数据的时候,必然经过持久化磁盘或者网络输出,所以需要对Java类型的数据进行序列化和反序列化。
Hadoop自己设计了一套轻便的序列化和反序列化接口Writable接口,Java对应的>8种类型都有对应的Writable类:
int --> IntWritable
long --> LongWritable
double -->DoubleWritable
String --> Text
null --> NullWritable,NullWritable是单例模式设计的,需要NullWritable.get()获得对象。
编写Mapper类中,我们主要做的事情就是重写map方法,文本每行内容都将执行一次该方法:
package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
final LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\t");
for (String w:words) {
context.write(new Text(w),one);
}
}
}
context是上下文对象,用于传输数据,写到hdfs。
map任务输出的中间结果一般是直接写入本地硬盘,这些本地文件在作业结束就可以删除。
3. 重写Reducer类
reduce处理的是map的结果,reduce的输入是map的输出。
Reduce类的参数包括四个泛型:
- KEYIN,Reducer输入的key类型,即Mapper对应的KEYOUT类型;
- VALUEIN,Reducer输入的value类型,即Mapper对应的VALUEOUT类型;
- KEYOUT,Reducer统计结果的key类型;
- VALUEOUT,Reducer统计结果的value类型;
package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
int valueout = 0;
for(LongWritable value:values) {
valueout += value.get();
}
context.write(key, new LongWritable(valueout));
}
}
4. 编写Driver驱动类
驱动类用于将Mapper和Reducer组装起来,并提供整个MR程序执行的程序入口。
编写一个MR程序,以wordcount为例:
package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//加载配置文件
Configuration conf = new Configuration();
//启动一个Job,封装maper和reducer
Job job = Job.getInstance(conf);
//设置计算程序的主驱动类,运行的时候打成jar包运行。
job.setJarByClass(Driver.class);
//设置Maper和Reduer类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//设置mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置输入路径和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交,需要打印日志
job.waitForCompletion(true);
}
调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径可以是单个文件、一个目录(目录下所有文件作为输入)或者一系列文件(通过多次调用addInputPath()来实现)。与此相应的,FileOutputFormat类中的静态方法setOutputPath()来指定输出路径,该路径必须不存在,否则程序会报错。
通过setMapperClass()和setReducerClass()来指定map类型和reduce类型。
setOutputKeyClass()和setOutputValueClass()来指定map和reduce的输入和输出类型。一般情况下map和reduce的输入和输出类型是一样的,如果不一样时,则使用setMapOutputKeyClass()和setMapOutputValueClass()类设置map函数的输出类型。
5. MR脚本打包上传**
以上mapreduce程序打包后,可以下命令执行:
hadoop jar /Users/wenhuan/hadoop-2.7.7/lib/hadoop-train-1.0.jar<jar包的路径>
com.imooc.hadoop.mapreduce.wordCountApp<MR类>
hdfs://wenhuan.local:8020/test_122.txt<输入文件在HDFS上的路径>
hdfs://wenhuan.local:8020/output<输出结果在HDFS上的路径>
MR优化
Partitioner分区和reducetask并行度
reducetask的数量由numReduceTasks决定,如果不人为设置,该值默认为1;
也可以在MapReduce程序的driver中人为设置reduce的数量:
//设置reducetask个数
job.setNumReduceTasks(10) ;
Partitioner类决定了MapTask输出的数据交由哪个ReduceTask处理。
默认的Partitioner的实现类是HashPartitioner类,这个类将获取上述设置的reducetask任务开启的个数numReduceTasks,分发的key的hash值对numReduceTasks个数取模:
public int getPartition(K key,V value,int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
}
如果默认的分区算法不能满足我们的需求,则我们可以自定义一个Partitioner的实现类:
package com.wenhuan.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartition extends Partitioner<Text,LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numReduceTasks) {
if (key.toString().equals("a")){
return 0;
}
if (key.toString().equals("b")){
return 1;
}
if (key.toString().equals("c")){
return 2;
}
return 3;
}
}
...
public static void main(String[] args) throws Exception {
...
//设置job的partition
job.setPartitionerClass(MyPartitioner.class);
//设置4个reducer,每个分区一个
job.setNumReduceTasks(4);
...
}
...
自定义分区时,输出的分区数必须小于或等于numReduceTasks,否则会报错。
分区return 返回的数字n,对应的结果文件part-r-00000n。
Combiner组件
Combiner组件可以减少从MapTasks输出的数据量以及shuffle过程的数量,从而提高性能。我们可以理解为在每个maptask结束后,在该节点又进行了一次reducer,之后再进行shuffle。
由于Combiner组件是承接Mapper和Reducer和,所以其前两个泛型参数与Mapper输出key\value类型一致,后两个泛型参数与Reducer输入key\value类型一致。默认情况下是没有Combiner组件的,我们可以自定义一个Combiner组件:
package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyCombiner extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for(LongWritable v:values) {
sum += v.get();
}
context.write(key, new LongWritable(sum));
}
}
</pre>
另外在Driver中需要配置:
<pre class="brush:java;toolbar:false" style="margin: 0.5em 0px; padding: 0.4em 0.6em; border-radius: 8px; background: rgb(248, 248, 248);">...
//通过job设置combiner处理类,其实逻辑上和我们的reduce是一样的
job.setCombinerClass(MyCombiner.class);
...
在大部分情况下,Combiner的业务逻辑和Reduce是一样的,所以一般我们也可以直接在Driver中配置,复用Reduce的组件:
...
public static void main(String[] args) throws Exception {
...
//通过job设置combiner处理类,其实逻辑上和我们的reduce是一样的
job.setCombinerClass(MyReducer.class);
...
}
...
但是只适合于一些类似求和、次数之类的使用场景。
Jobhistory
hadoop/etc/hadoop/mapred-site.xml中配置以下内容,存储job执行记录:
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>wenhuan.localhost:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>wenhuan.localhost:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.done</name>
<value>/history/done</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>
</configuration>
之后需要重启yarn,之后在/sbin目录下启动如下命令:
mr-jobhistory-daemon.sh start historyserver
网友评论