每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次 合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一
- combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件
- combiner 组件的父类就是 Reducer
- combiner 和 reducer 的区别在于运行的位置
- Combiner 是在每一个 maptask 所在的节点运行
- Reducer 是接收全局所有 Mapper 的输出结果
- ombiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量
实现步骤
- 自定义一个 combiner 继承 Reducer,重写 reduce 方法
- 在 job 中设置 job.setCombinerClass(CustomCombiner.class)
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来
combiner其实就是对Reducer的提前重写,以我们数字统计的案例为例,我们需要新建一个MyCombiner类,在类中继承Reducer类,进行重写
MyCombiner
package cn.leon.combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
//1.遍历集合,将集合中的数字相加,得到V3
for(LongWritable value : values){
count+=value.get();
}
//2.将K3和V3写入上下文
context.write(key,new LongWritable(count));
}
}
然后在主类中加上这个combiner类,即可
package cn.leon.combiner;
import cn.leon.mapReduce.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//1 . 创建job对象
Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
//打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数
job.setJarByClass(JobMain.class);
//2. 配置job任务(8个步骤)
//第一步:设置输入类和输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/wordcount"));
//第二步:设置mapper类
job.setMapperClass(WordCountMapper.class);
//设置我们map阶段完成之后的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三步,第四步,第六步,省略
//第五步:设置combner
job.setCombinerClass(MyCombiner.class);
//第七步:设置我们的reduce类
job.setReducerClass(WordCountReducer.class);
//设置我们reduce阶段完成之后的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步:设置输出类以及输出路径
Path path = new Path("hdfs://node01:8020/output/wordcount_out");
//判断目录是否存在
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020/output/wordcount_cout"),new Configuration());
if (fileSystem.exists(path)){
//删除目标目录
fileSystem.delete(path,true);
}
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
//3. 等待任务结束
return b?0:1;
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Tool tool = new JobMain();
//启动Job任务
int run = ToolRunner.run(configuration, tool, args);
System.exit(run);
}
}
我们对比一下运行结果
首先看没有加上Combiner,Reducer的执行结果
Reduce input groups=19
Reduce shuffle bytes=16937
Reduce input records=1032
Reduce output records=19
再看加上了Combiner之后Reducer的执行结果
Reduce input groups=19
Reduce shuffle bytes=528
Reduce input records=31
Reduce output records=19
对比上面的结果,我们可以看到,在运行了Combiner之后,Reducer的执行效果大大增加,减少了很多的次数。所以还是推荐用Combiner的,这样可以提高Reducer的运行效率和增加网络传输速度。
网友评论