美文网首页
数据去重

数据去重

作者: 小月半会飞 | 来源:发表于2019-01-02 14:23 被阅读0次

    要求实现功能:

    将File1和File2合并成一个文件,将重复的部分去除
    File1:

    2017-3-1 a
    2017-3-2 b
    2017-3-3 c
    2017-3-4 d
    2017-3-5 a
    2017-3-6 b
    2017-3-7 c
    2017-3-3 c
    

    File2:

    2017-3-1 b
    2017-3-2 a
    2017-3-3 b
    2017-3-4 d
    2017-3-5 a
    2017-3-6 c
    2017-3-7 d
    2017-3-3 c
    

    结果:

    2017-3-1 a  
    2017-3-1 b  
    2017-3-2 a  
    2017-3-2 b  
    2017-3-3 b  
    2017-3-3 c  
    2017-3-4 d  
    2017-3-5 a  
    2017-3-6 b  
    2017-3-6 c  
    2017-3-7 c  
    2017-3-7 d  
    

    DistinctMapper部分代码:

    package com.neusoft;
    
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class DistinctMapper extends Mapper<LongWritable,Text,Text,Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(new Text(value),new Text(""));
        }
    }
    
    

    DistinctReducer:

    传过来的数据,key是不重复的,唯一的

    package com.neusoft;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    
    public class DistinctReducer extends Reducer<Text,Text,Text,Text>{
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(key,new Text(""));
        }
    }
    
    

    DistinctDriver :

    package com.neusoft;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class DistinctDriver {
    
        public static void main(String[] args) throws Exception {
            System.setProperty("HADOOP_USER_NAME", "root") ;
            System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
            if (args == null || args.length == 0) {
                return;
            }
            FileUtil.deleteDir(args[1]);
            //该对象会默认读取环境中的 hadoop 配置。当然,也可以通过 set 重新进行配置
            Configuration conf = new Configuration();
    
            //job 是 yarn 中任务的抽象。
            Job job = Job.getInstance(conf);
    
            /*job.setJar("/home/hadoop/wc.jar");*/
            //指定本程序的jar包所在的本地路径
            job.setJarByClass(DistinctDriver.class);
    
            //指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(DistinctMapper.class);
            job.setReducerClass(DistinctReducer.class);
    
            //指定mapper输出数据的kv类型。需要和 Mapper 中泛型的类型保持一致
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            //指定最终输出的数据的kv类型。这里也是 Reduce 的 key,value类型。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            //指定Combiner,Reducer可以当做Combiner使用
    //        job.setCombinerClass(DistinctReducer.class);
    
            //指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
            // true表示一直等待map和reduce任务执行完成
            /*job.submit();*/
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:数据去重

          本文链接:https://www.haomeiwen.com/subject/ozjzlqtx.html