要求实现功能:
将File1和File2合并成一个文件,将重复的部分去除
File1:
2017-3-1 a
2017-3-2 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-7 c
2017-3-3 c
File2:
2017-3-1 b
2017-3-2 a
2017-3-3 b
2017-3-4 d
2017-3-5 a
2017-3-6 c
2017-3-7 d
2017-3-3 c
结果:
2017-3-1 a
2017-3-1 b
2017-3-2 a
2017-3-2 b
2017-3-3 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-6 c
2017-3-7 c
2017-3-7 d
DistinctMapper部分代码:
package com.neusoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class DistinctMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(new Text(value),new Text(""));
}
}
DistinctReducer:
传过来的数据,key是不重复的,唯一的
package com.neusoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class DistinctReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key,new Text(""));
}
}
DistinctDriver :
package com.neusoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistinctDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root") ;
System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
if (args == null || args.length == 0) {
return;
}
FileUtil.deleteDir(args[1]);
//该对象会默认读取环境中的 hadoop 配置。当然,也可以通过 set 重新进行配置
Configuration conf = new Configuration();
//job 是 yarn 中任务的抽象。
Job job = Job.getInstance(conf);
/*job.setJar("/home/hadoop/wc.jar");*/
//指定本程序的jar包所在的本地路径
job.setJarByClass(DistinctDriver.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReducer.class);
//指定mapper输出数据的kv类型。需要和 Mapper 中泛型的类型保持一致
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定最终输出的数据的kv类型。这里也是 Reduce 的 key,value类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定Combiner,Reducer可以当做Combiner使用
// job.setCombinerClass(DistinctReducer.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
// true表示一直等待map和reduce任务执行完成
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
网友评论