美文网首页眼君的大数据之路
MapReduce开发笔记(一、基本流程和优化)

MapReduce开发笔记(一、基本流程和优化)

作者: 眼君 | 来源:发表于2020-09-01 17:18 被阅读0次

编写MR脚本的步骤

1. 添加MR脚本的依赖包

编写MR程序前,需要添加一些依赖的Jar包,主要分为四块:

  1. common
  2. hdfs
  3. mapreduce
  4. yarn

如果是maven项目,则需要在pom文件中添加如下依赖:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-yarn</artifactId>
  <version>${hadoop.version}</version>
</dependency>

2. 重写Mapper类

Mapper类的参数包括四个泛型:

  1. KEYIN,输入key的泛型,如果类型是LongWritable,表示截止目前为止从文件读取字节数,如果不关注该键,可以用Object类型;
  2. VALUEIN,输入值的类型,其类型是Text表示从文本读入一行记录;
  3. KEYOUT,输出的key的类型;
  4. VALUEOUT,输出的value的类型。
序列化和反序列化

mapreduce处理数据的时候,必然经过持久化磁盘或者网络输出,所以需要对Java类型的数据进行序列化和反序列化。
Hadoop自己设计了一套轻便的序列化和反序列化接口Writable接口,Java对应的>8种类型都有对应的Writable类:

int --> IntWritable
long --> LongWritable
double -->DoubleWritable
String --> Text
null --> NullWritable,NullWritable是单例模式设计的,需要NullWritable.get()获得对象。

编写Mapper类中,我们主要做的事情就是重写map方法,文本每行内容都将执行一次该方法:

package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    final LongWritable one = new LongWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\t");
        for (String w:words) {
            context.write(new Text(w),one);
        }   
    }
}

context是上下文对象,用于传输数据,写到hdfs。
map任务输出的中间结果一般是直接写入本地硬盘,这些本地文件在作业结束就可以删除。

3. 重写Reducer类

reduce处理的是map的结果,reduce的输入是map的输出。

Reduce类的参数包括四个泛型:

  1. KEYIN,Reducer输入的key类型,即Mapper对应的KEYOUT类型;
  2. VALUEIN,Reducer输入的value类型,即Mapper对应的VALUEOUT类型;
  3. KEYOUT,Reducer统计结果的key类型;
  4. VALUEOUT,Reducer统计结果的value类型;
package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text,LongWritable,Text,LongWritable>{

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
        int valueout = 0;
        for(LongWritable value:values) {
            valueout += value.get();
        }
        context.write(key, new LongWritable(valueout));
    }
}

4. 编写Driver驱动类

驱动类用于将Mapper和Reducer组装起来,并提供整个MR程序执行的程序入口。
编写一个MR程序,以wordcount为例:

package com.wenhuan.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

public class Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //加载配置文件
        Configuration conf = new Configuration();
        //启动一个Job,封装maper和reducer
        Job job = Job.getInstance(conf);
        //设置计算程序的主驱动类,运行的时候打成jar包运行。
        job.setJarByClass(Driver.class);
        //设置Maper和Reduer类
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        //设置mapper的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //设置reducer的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //设置输入路径和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交,需要打印日志
        job.waitForCompletion(true);
    }

调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径可以是单个文件、一个目录(目录下所有文件作为输入)或者一系列文件(通过多次调用addInputPath()来实现)。与此相应的,FileOutputFormat类中的静态方法setOutputPath()来指定输出路径,该路径必须不存在,否则程序会报错。

通过setMapperClass()和setReducerClass()来指定map类型和reduce类型。

setOutputKeyClass()和setOutputValueClass()来指定map和reduce的输入和输出类型。一般情况下map和reduce的输入和输出类型是一样的,如果不一样时,则使用setMapOutputKeyClass()和setMapOutputValueClass()类设置map函数的输出类型。

5. MR脚本打包上传**

以上mapreduce程序打包后,可以下命令执行:

hadoop jar /Users/wenhuan/hadoop-2.7.7/lib/hadoop-train-1.0.jar<jar包的路径>
com.imooc.hadoop.mapreduce.wordCountApp<MR类> 
hdfs://wenhuan.local:8020/test_122.txt<输入文件在HDFS上的路径>
hdfs://wenhuan.local:8020/output<输出结果在HDFS上的路径>

MR优化

Partitioner分区和reducetask并行度

reducetask的数量由numReduceTasks决定,如果不人为设置,该值默认为1;
也可以在MapReduce程序的driver中人为设置reduce的数量:

//设置reducetask个数
job.setNumReduceTasks(10) ;

Partitioner类决定了MapTask输出的数据交由哪个ReduceTask处理。

默认的Partitioner的实现类是HashPartitioner类,这个类将获取上述设置的reducetask任务开启的个数numReduceTasks,分发的key的hash值对numReduceTasks个数取模:

public int getPartition(K key,V value,int numReduceTasks){
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
}

如果默认的分区算法不能满足我们的需求,则我们可以自定义一个Partitioner的实现类:

package com.wenhuan.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartition extends Partitioner<Text,LongWritable> {
    @Override
    public int getPartition(Text key, LongWritable value, int numReduceTasks) {
        if (key.toString().equals("a")){
            return 0;
        }
        if (key.toString().equals("b")){
            return 1;
        }
        if (key.toString().equals("c")){
            return 2;
        }
        return 3;
    }
}
...
public static void main(String[] args) throws Exception {
    ...
    //设置job的partition
    job.setPartitionerClass(MyPartitioner.class);
    //设置4个reducer,每个分区一个
    job.setNumReduceTasks(4);
    ...
}
...

自定义分区时,输出的分区数必须小于或等于numReduceTasks,否则会报错。

分区return 返回的数字n,对应的结果文件part-r-00000n。

Combiner组件

Combiner组件可以减少从MapTasks输出的数据量以及shuffle过程的数量,从而提高性能。我们可以理解为在每个maptask结束后,在该节点又进行了一次reducer,之后再进行shuffle。

由于Combiner组件是承接Mapper和Reducer和,所以其前两个泛型参数与Mapper输出key\value类型一致,后两个泛型参数与Reducer输入key\value类型一致。默认情况下是没有Combiner组件的,我们可以自定义一个Combiner组件:

package com.wenhuan.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyCombiner extends Reducer<Text,LongWritable,Text,LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(LongWritable v:values) {
            sum += v.get();
        }
        context.write(key, new LongWritable(sum));
    }
}
</pre>

另外在Driver中需要配置:

<pre class="brush:java;toolbar:false" style="margin: 0.5em 0px; padding: 0.4em 0.6em; border-radius: 8px; background: rgb(248, 248, 248);">...
//通过job设置combiner处理类,其实逻辑上和我们的reduce是一样的
job.setCombinerClass(MyCombiner.class);
...

在大部分情况下,Combiner的业务逻辑和Reduce是一样的,所以一般我们也可以直接在Driver中配置,复用Reduce的组件:

...
public static void main(String[] args) throws Exception {
    ...
    //通过job设置combiner处理类,其实逻辑上和我们的reduce是一样的
    job.setCombinerClass(MyReducer.class);
    ...
}
...

但是只适合于一些类似求和、次数之类的使用场景。

Jobhistory

hadoop/etc/hadoop/mapred-site.xml中配置以下内容,存储job执行记录:

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>

    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>wenhuan.localhost:10020</value>
    </property>

    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>wenhuan.localhost:19888</value>
    </property>

    <property>
        <name>mapreduce.jobhistory.done</name>
        <value>/history/done</value>
    </property>

    <property>
        <name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>/history/done_intermediate</value>
    </property>
</configuration>

之后需要重启yarn,之后在/sbin目录下启动如下命令:

mr-jobhistory-daemon.sh start historyserver

相关文章

网友评论

    本文标题:MapReduce开发笔记(一、基本流程和优化)

    本文链接:https://www.haomeiwen.com/subject/jzvvsktx.html