美文网首页
MapReduce 基础 (五)规约

MapReduce 基础 (五)规约

作者: 做个合格的大厂程序员 | 来源:发表于2020-06-17 11:11 被阅读0次

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次 合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一

  • combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
  • combiner 组件的父类就是 Reducer
  • combiner 和 reducer 的区别在于运行的位置
    • Combiner 是在每一个 maptask 所在的节点运行
    • Reducer 是接收全局所有 Mapper 的输出结果
  • ombiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量

实现步骤

  1. 自定义一个 combiner 继承 Reducer,重写 reduce 方法
  2. 在 job 中设置 job.setCombinerClass(CustomCombiner.class)

combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来

combiner其实就是对Reducer的提前重写,以我们数字统计的案例为例,我们需要新建一个MyCombiner类,在类中继承Reducer类,进行重写

MyCombiner

package cn.leon.combiner;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        //1.遍历集合,将集合中的数字相加,得到V3
        for(LongWritable value : values){
            count+=value.get();
        }
        //2.将K3和V3写入上下文
        context.write(key,new LongWritable(count));
    }
}

然后在主类中加上这个combiner类,即可

package cn.leon.combiner;

import cn.leon.mapReduce.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        //1 . 创建job对象
        Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
        //打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数
        job.setJarByClass(JobMain.class);
        //2. 配置job任务(8个步骤)
            //第一步:设置输入类和输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/wordcount"));

            //第二步:设置mapper类
            job.setMapperClass(WordCountMapper.class);
            //设置我们map阶段完成之后的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);

            //第三步,第四步,第六步,省略
            //第五步:设置combner
            job.setCombinerClass(MyCombiner.class);

            //第七步:设置我们的reduce类
            job.setReducerClass(WordCountReducer.class);
            //设置我们reduce阶段完成之后的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            //第八步:设置输出类以及输出路径
            Path path = new Path("hdfs://node01:8020/output/wordcount_out");

            //判断目录是否存在
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020/output/wordcount_cout"),new Configuration());
            if (fileSystem.exists(path)){
                //删除目标目录
                fileSystem.delete(path,true);
            }

            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job,path);
            boolean b = job.waitForCompletion(true);

        //3. 等待任务结束
        return b?0:1;
    }

    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();

        Tool tool = new JobMain();
        //启动Job任务
        int run = ToolRunner.run(configuration, tool, args);

        System.exit(run);
    }
}

我们对比一下运行结果

首先看没有加上Combiner,Reducer的执行结果

Reduce input groups=19
Reduce shuffle bytes=16937
Reduce input records=1032
Reduce output records=19

再看加上了Combiner之后Reducer的执行结果

Reduce input groups=19
Reduce shuffle bytes=528
Reduce input records=31
Reduce output records=19

对比上面的结果,我们可以看到,在运行了Combiner之后,Reducer的执行效果大大增加,减少了很多的次数。所以还是推荐用Combiner的,这样可以提高Reducer的运行效率和增加网络传输速度。

相关文章

网友评论

      本文标题:MapReduce 基础 (五)规约

      本文链接:https://www.haomeiwen.com/subject/vdwjxktx.html