美文网首页
2020-05-24

2020-05-24

作者: 我的小猫不见了 | 来源:发表于2020-05-24 11:20 被阅读0次
    package TopN;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    import java.io.IOException;
    import java.util.SortedMap;
    import java.util.TreeMap;
    
    /**
     * Created by Administrator on 2019/4/3.
     */
    public class TopnRunner {
    
        public static class TopnMapper extends
                Mapper<LongWritable, Text, NullWritable, Text> {
            //定义一个排序集合,用来收集每个分片中topn值
            //xxx,12
            private SortedMap<Long, String> topN = new TreeMap<Long, String>();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //遍历整个分片数据集,把新的数据插入到SortedMap
    
                Long number = Long.valueOf(value.toString());
    
                //aa,10   (10, "aa,10")
                //找到每个分片中 最大5个数据
                topN.put(number, value.toString());
    
                //取出最大5个值
                if(topN.size() > 5) {
                    //始终移除最小的那个
                    topN.remove(topN.firstKey());
                }
    
                //只有 遍历完整个分片数据 才能调用context.write方法
                //context.write();
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
    
                Text mapValue = new Text();
    
                //切记 该算法  采集数据 不多
                for (String tmp:topN.values()) {
                    mapValue.set(tmp);
                    context.write(NullWritable.get(), mapValue);
                }
                System.out.println("-----------------------------");
            }
        }
    
        public static class TopnReducer extends
                Reducer<NullWritable, Text, Text, NullWritable> {
    
            private SortedMap<Long, String> topN = new TreeMap<Long, String>();
            private Text reduceKey = new Text();
    
            @Override
            protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //super.reduce(key, values, context);
                for (Text tmp:values) {
                    System.out.println(tmp);
                    topN.put(Long.valueOf(tmp.toString()), tmp.toString());
    
    
                    if (topN.size() > 5) {
                        topN.remove(topN.firstKey());
                    }
                }
    
                //获取整个数据集 topn
                for (String tmp:topN.values()) {
                    reduceKey.set(tmp);
                    context.write(reduceKey, NullWritable.get());
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
            System.setProperty("hadoop.home.dir",
                    "E:\\hadoop-2.6.0-cdh5.15.0");
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://node1:8020");
    
            Job job = Job.getInstance(conf);
            job.setJobName("优化后的topn");
            job.setJarByClass(TopnRunner.class);
    
            Path inputPath = new Path("/topn.txt");
            FileInputFormat.addInputPath(job, inputPath);
    
            job.setMapperClass(TopnMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
    
    
            job.setReducerClass(TopnReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            Path outputPath = new Path("/AASD");
            FileOutputFormat.setOutputPath(job, outputPath);
    
            job.waitForCompletion(true);
    
        }
    }
    
    
    
    

    相关文章

      网友评论

          本文标题:2020-05-24

          本文链接:https://www.haomeiwen.com/subject/cpvdahtx.html