美文网首页
TopN案例

TopN案例

作者: bullion | 来源:发表于2019-03-09 17:10 被阅读0次

需求

需求分析

实现

    1)根据序列化实操案例实现

    2)修改FlowBean类

    @Override

    public int compareTo(FlowBean bean) {

        int result;

        if (this.sumFlow > bean.getSumFlow()) {

            result = -1;

        }else if (this.sumFlow < bean.getSumFlow()) {

            result = 1;

        }else {

            result = 0;

        }

        return result;

    }

    3)修改TopNMapper类

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

@Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        ......

        // 4 向TreeMap中添加数据

        flowMap.put(kBean, v);

        // 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据

        if (flowMap.size() > 10) {

            flowMap.remove(flowMap.lastKey());

        }

    }

@Override

    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 6 遍历treeMap集合,输出数据

        Iterator<FlowBean> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {

            FlowBean k = bean.next();

            context.write(k, flowMap.get(k));

        }

    }

    4)修改TopNReducer类

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

@Override

    protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

        for (Text value : values) {

            FlowBean bean = new FlowBean();

            bean.set(key.getDownFlow(), key.getUpFlow());

            // 1 向treeMap集合中添加数据

            flowMap.put(bean, new Text(value));

            // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据

            if (flowMap.size() > 10) {

                // flowMap.remove(flowMap.firstKey());

                flowMap.remove(flowMap.lastKey());

            }

        }

    }

@Override

    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        // 3 遍历集合,输出数据

        Iterator<FlowBean> it = flowMap.keySet().iterator();

        while (it.hasNext()) {

            FlowBean v = it.next();

            context.write(new Text(flowMap.get(v)), v);

        }

    }

实际代码

FlowBean

public class FlowBean implements WritableComparable<FlowBean>{

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    public FlowBean() {

        super();

    }

    public FlowBean(long upFlow, long downFlow) {

        super();

        this.upFlow = upFlow;

        this.downFlow = downFlow;

    }

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeLong(upFlow);

        out.writeLong(downFlow);

        out.writeLong(sumFlow);

    }

    @Override

    public void readFields(DataInput in) throws IOException {

        upFlow = in.readLong();

        downFlow = in.readLong();

        sumFlow = in.readLong();

    }

    public long getUpFlow() {

        return upFlow;

    }

    public void setUpFlow(long upFlow) {

        this.upFlow = upFlow;

    }

    public long getDownFlow() {

        return downFlow;

    }

    public void setDownFlow(long downFlow) {

        this.downFlow = downFlow;

    }

    public long getSumFlow() {

        return sumFlow;

    }

    public void setSumFlow(long sumFlow) {

        this.sumFlow = sumFlow;

    }

    @Override

    public String toString() {

        return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }

    public void set(long downFlow2, long upFlow2) {

        downFlow = downFlow2;

        upFlow = upFlow2;

        sumFlow = downFlow2 + upFlow2;

    }

    @Override

    public int compareTo(FlowBean bean) {

        int result;

        if (this.sumFlow > bean.getSumFlow()) {

            result = -1;

        }else if (this.sumFlow < bean.getSumFlow()) {

            result = 1;

        }else {

            result = 0;

        }

        return result;

    }

}

TopNMapper

public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    private FlowBean kBean;

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        kBean = new FlowBean();

        Text v = new Text();

        // 1 获取一行

        String line = value.toString();

        // 2 切割

        String[] fields = line.split("\t");

        // 3 封装数据

        String phoneNum = fields[0];

        long upFlow = Long.parseLong(fields[1]);

        long downFlow = Long.parseLong(fields[2]);

        long sumFlow = Long.parseLong(fields[3]);

        kBean.setDownFlow(downFlow);

        kBean.setUpFlow(upFlow);

        kBean.setSumFlow(sumFlow);

        v.set(phoneNum);

        // 4 向TreeMap中添加数据

        flowMap.put(kBean, v);

        // 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据

        if (flowMap.size() > 10) {

// flowMap.remove(flowMap.firstKey());

            flowMap.remove(flowMap.lastKey());

        }

    }

    @Override

    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 6 遍历treeMap集合,输出数据

        Iterator<FlowBean> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {

            FlowBean k = bean.next();

            context.write(k, flowMap.get(k));

        }

    }

}

TopNReducer

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    @Override

    protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

        for (Text value : values) {

            FlowBean bean = new FlowBean();

            bean.set(key.getDownFlow(), key.getUpFlow());

            // 1 向treeMap集合中添加数据

            flowMap.put(bean, new Text(value));

            // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据

            if (flowMap.size() > 10) {

                // flowMap.remove(flowMap.firstKey());

                flowMap.remove(flowMap.lastKey());

            }

        }

    }

    @Override

    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        // 3 遍历集合,输出数据

        Iterator<FlowBean> it = flowMap.keySet().iterator();

        while (it.hasNext()) {

            FlowBean v = it.next();

            context.write(new Text(flowMap.get(v)), v);

        }

    }

}

TopNDriver

public class TopNDriver {

    public static void main(String[] args) throws Exception {

        args  = new String[]{"e:/output1","e:/output3"};

        // 1 获取配置信息,或者job对象实例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路径

        job.setJarByClass(TopNDriver.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类

        job.setMapperClass(TopNMapper.class);

        job.setReducerClass(TopNReducer.class);

        // 3 指定mapper输出数据的kv类型

        job.setMapOutputKeyClass(FlowBean.class);

        job.setMapOutputValueClass(Text.class);

        // 4 指定最终输出的数据的kv类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的输入原始文件所在目录

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

相关文章

  • TopN案例

    需求 需求分析 实现 1)根据序列化实操案例实现 2)修改FlowBean类 @Override publi...

  • Spark系列 - 实时数仓之top3热门广告实战(二)

      在之前的文章中我们使用 Flink 也实现过 topn 的案例;这里,为了温习 Spark 如何访问 kafk...

  • 2020-12-16-Spark-24(复习提纲)

    1.使用工具类时的多线程问题,多线程问题2.topN案例 组内排序案例的性能分析3.开窗案例再写一遍4.spark...

  • 2020-12.4--Spark-12(Spark-Core)

    distinct算子 数据写入mysql topN案例的性能分析 1.spark基本概念的复习 RDD:是一个...

  • 推荐系统常用评价指标

    评分预测 TopN推荐

  • 2020-11-27-Spark-6(Spark-Core)

    spark练习题处理数据上的分组和业务需求上的分组 1.案例topN(要点使用模式匹配重新分组) 2.基础练习题(...

  • 堆排序和topN算法

    堆排序和topN算法:topN算法,第一次调用topN,然后把海量数据一次和小顶堆第一个比较,如果>第一个元素,就...

  • 换个思路丨环比断点

    2019年的年末,白茶曾经写过一期《TOPN函数丨环比断点》的文章,主要是利用TOPN函数来解决实际销售挂蛋(“当...

  • TopN问题

    从1000个数中选择前50个大的数 TopN问题:堆排序思想:先从数组中选出前n个元素组成小根堆。然后遍历后续元素...

  • TopN问题

    TopN问题是Android面试中经常遇到的问题,集在一个很大的集合中找到前N大的树,如给出10亿个无序的整数集合...

网友评论

      本文标题:TopN案例

      本文链接:https://www.haomeiwen.com/subject/owqepqtx.html