美文网首页眼君的大数据之路
MapReduce开发笔记(一、基本流程和优化)

MapReduce开发笔记(一、基本流程和优化)

作者: 眼君 | 来源:发表于2020-09-01 17:18 被阅读0次

    编写MR脚本的步骤

    1. 添加MR脚本的依赖包

    编写MR程序前,需要添加一些依赖的Jar包,主要分为四块:

    1. common
    2. hdfs
    3. mapreduce
    4. yarn

    如果是maven项目,则需要在pom文件中添加如下依赖:

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    

    2. 重写Mapper类

    Mapper类的参数包括四个泛型:

    1. KEYIN,输入key的泛型,如果类型是LongWritable,表示截止目前为止从文件读取字节数,如果不关注该键,可以用Object类型;
    2. VALUEIN,输入值的类型,其类型是Text表示从文本读入一行记录;
    3. KEYOUT,输出的key的类型;
    4. VALUEOUT,输出的value的类型。
    序列化和反序列化

    mapreduce处理数据的时候,必然经过持久化磁盘或者网络输出,所以需要对Java类型的数据进行序列化和反序列化。
    Hadoop自己设计了一套轻便的序列化和反序列化接口Writable接口,Java对应的>8种类型都有对应的Writable类:

    int --> IntWritable
    long --> LongWritable
    double -->DoubleWritable
    String --> Text
    null --> NullWritable,NullWritable是单例模式设计的,需要NullWritable.get()获得对象。

    编写Mapper类中,我们主要做的事情就是重写map方法,文本每行内容都将执行一次该方法:

    package com.wenhuan.wordcount;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        final LongWritable one = new LongWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split("\t");
            for (String w:words) {
                context.write(new Text(w),one);
            }   
        }
    }
    

    context是上下文对象,用于传输数据,写到hdfs。
    map任务输出的中间结果一般是直接写入本地硬盘,这些本地文件在作业结束就可以删除。

    3. 重写Reducer类

    reduce处理的是map的结果,reduce的输入是map的输出。

    Reduce类的参数包括四个泛型:

    1. KEYIN,Reducer输入的key类型,即Mapper对应的KEYOUT类型;
    2. VALUEIN,Reducer输入的value类型,即Mapper对应的VALUEOUT类型;
    3. KEYOUT,Reducer统计结果的key类型;
    4. VALUEOUT,Reducer统计结果的value类型;
    package com.wenhuan.wordcount;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WCReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
            int valueout = 0;
            for(LongWritable value:values) {
                valueout += value.get();
            }
            context.write(key, new LongWritable(valueout));
        }
    }
    

    4. 编写Driver驱动类

    驱动类用于将Mapper和Reducer组装起来,并提供整个MR程序执行的程序入口。
    编写一个MR程序,以wordcount为例:

    package com.wenhuan.wordcount;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    public class Driver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //加载配置文件
            Configuration conf = new Configuration();
            //启动一个Job,封装maper和reducer
            Job job = Job.getInstance(conf);
            //设置计算程序的主驱动类,运行的时候打成jar包运行。
            job.setJarByClass(Driver.class);
            //设置Maper和Reduer类
            job.setMapperClass(WCMapper.class);
            job.setReducerClass(WCReducer.class);
            //设置mapper的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //设置reducer的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            //设置输入路径和输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            //提交,需要打印日志
            job.waitForCompletion(true);
        }
    

    调用FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径可以是单个文件、一个目录(目录下所有文件作为输入)或者一系列文件(通过多次调用addInputPath()来实现)。与此相应的,FileOutputFormat类中的静态方法setOutputPath()来指定输出路径,该路径必须不存在,否则程序会报错。

    通过setMapperClass()和setReducerClass()来指定map类型和reduce类型。

    setOutputKeyClass()和setOutputValueClass()来指定map和reduce的输入和输出类型。一般情况下map和reduce的输入和输出类型是一样的,如果不一样时,则使用setMapOutputKeyClass()和setMapOutputValueClass()类设置map函数的输出类型。

    5. MR脚本打包上传**

    以上mapreduce程序打包后,可以下命令执行:

    hadoop jar /Users/wenhuan/hadoop-2.7.7/lib/hadoop-train-1.0.jar<jar包的路径>
    com.imooc.hadoop.mapreduce.wordCountApp<MR类> 
    hdfs://wenhuan.local:8020/test_122.txt<输入文件在HDFS上的路径>
    hdfs://wenhuan.local:8020/output<输出结果在HDFS上的路径>
    

    MR优化

    Partitioner分区和reducetask并行度

    reducetask的数量由numReduceTasks决定,如果不人为设置,该值默认为1;
    也可以在MapReduce程序的driver中人为设置reduce的数量:

    //设置reducetask个数
    job.setNumReduceTasks(10) ;
    

    Partitioner类决定了MapTask输出的数据交由哪个ReduceTask处理。

    默认的Partitioner的实现类是HashPartitioner类,这个类将获取上述设置的reducetask任务开启的个数numReduceTasks,分发的key的hash值对numReduceTasks个数取模:

    public int getPartition(K key,V value,int numReduceTasks){
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
    }
    

    如果默认的分区算法不能满足我们的需求,则我们可以自定义一个Partitioner的实现类:

    package com.wenhuan.wordcount;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class MyPartition extends Partitioner<Text,LongWritable> {
        @Override
        public int getPartition(Text key, LongWritable value, int numReduceTasks) {
            if (key.toString().equals("a")){
                return 0;
            }
            if (key.toString().equals("b")){
                return 1;
            }
            if (key.toString().equals("c")){
                return 2;
            }
            return 3;
        }
    }
    ...
    public static void main(String[] args) throws Exception {
        ...
        //设置job的partition
        job.setPartitionerClass(MyPartitioner.class);
        //设置4个reducer,每个分区一个
        job.setNumReduceTasks(4);
        ...
    }
    ...
    

    自定义分区时,输出的分区数必须小于或等于numReduceTasks,否则会报错。

    分区return 返回的数字n,对应的结果文件part-r-00000n。

    Combiner组件

    Combiner组件可以减少从MapTasks输出的数据量以及shuffle过程的数量,从而提高性能。我们可以理解为在每个maptask结束后,在该节点又进行了一次reducer,之后再进行shuffle。

    由于Combiner组件是承接Mapper和Reducer和,所以其前两个泛型参数与Mapper输出key\value类型一致,后两个泛型参数与Reducer输入key\value类型一致。默认情况下是没有Combiner组件的,我们可以自定义一个Combiner组件:

    package com.wenhuan.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyCombiner extends Reducer<Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
            int sum = 0;
            for(LongWritable v:values) {
                sum += v.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }
    </pre>
    
    另外在Driver中需要配置:
    
    <pre class="brush:java;toolbar:false" style="margin: 0.5em 0px; padding: 0.4em 0.6em; border-radius: 8px; background: rgb(248, 248, 248);">...
    //通过job设置combiner处理类,其实逻辑上和我们的reduce是一样的
    job.setCombinerClass(MyCombiner.class);
    ...
    

    在大部分情况下,Combiner的业务逻辑和Reduce是一样的,所以一般我们也可以直接在Driver中配置,复用Reduce的组件:

    ...
    public static void main(String[] args) throws Exception {
        ...
        //通过job设置combiner处理类,其实逻辑上和我们的reduce是一样的
        job.setCombinerClass(MyReducer.class);
        ...
    }
    ...
    

    但是只适合于一些类似求和、次数之类的使用场景。

    Jobhistory

    hadoop/etc/hadoop/mapred-site.xml中配置以下内容,存储job执行记录:

    <configuration>
        <property>
            <name>mapreduce.framework.name</name>
            <value>yarn</value>
        </property>
    
        <property>
            <name>mapreduce.jobhistory.address</name>
            <value>wenhuan.localhost:10020</value>
        </property>
    
        <property>
            <name>mapreduce.jobhistory.webapp.address</name>
            <value>wenhuan.localhost:19888</value>
        </property>
    
        <property>
            <name>mapreduce.jobhistory.done</name>
            <value>/history/done</value>
        </property>
    
        <property>
            <name>mapreduce.jobhistory.intermediate-done-dir</name>
            <value>/history/done_intermediate</value>
        </property>
    </configuration>
    

    之后需要重启yarn,之后在/sbin目录下启动如下命令:

    mr-jobhistory-daemon.sh start historyserver
    

    相关文章

      网友评论

        本文标题:MapReduce开发笔记(一、基本流程和优化)

        本文链接:https://www.haomeiwen.com/subject/jzvvsktx.html