美文网首页
TopN案例

TopN案例

作者: bullion | 来源:发表于2019-03-09 17:10 被阅读0次

    需求

    需求分析

    实现

        1)根据序列化实操案例实现

        2)修改FlowBean类

        @Override

        public int compareTo(FlowBean bean) {

            int result;

            if (this.sumFlow > bean.getSumFlow()) {

                result = -1;

            }else if (this.sumFlow < bean.getSumFlow()) {

                result = 1;

            }else {

                result = 0;

            }

            return result;

        }

        3)修改TopNMapper类

        // 定义一个TreeMap作为存储数据的容器(天然按key排序)

        private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            ......

            // 4 向TreeMap中添加数据

            flowMap.put(kBean, v);

            // 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据

            if (flowMap.size() > 10) {

                flowMap.remove(flowMap.lastKey());

            }

        }

    @Override

        protected void cleanup(Context context) throws IOException, InterruptedException {

            // 6 遍历treeMap集合,输出数据

            Iterator<FlowBean> bean = flowMap.keySet().iterator();

            while (bean.hasNext()) {

                FlowBean k = bean.next();

                context.write(k, flowMap.get(k));

            }

        }

        4)修改TopNReducer类

        // 定义一个TreeMap作为存储数据的容器(天然按key排序)

        TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    @Override

        protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

            for (Text value : values) {

                FlowBean bean = new FlowBean();

                bean.set(key.getDownFlow(), key.getUpFlow());

                // 1 向treeMap集合中添加数据

                flowMap.put(bean, new Text(value));

                // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据

                if (flowMap.size() > 10) {

                    // flowMap.remove(flowMap.firstKey());

                    flowMap.remove(flowMap.lastKey());

                }

            }

        }

    @Override

        protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

            // 3 遍历集合,输出数据

            Iterator<FlowBean> it = flowMap.keySet().iterator();

            while (it.hasNext()) {

                FlowBean v = it.next();

                context.write(new Text(flowMap.get(v)), v);

            }

        }

    实际代码

    FlowBean

    public class FlowBean implements WritableComparable<FlowBean>{

        private long upFlow;

        private long downFlow;

        private long sumFlow;

        public FlowBean() {

            super();

        }

        public FlowBean(long upFlow, long downFlow) {

            super();

            this.upFlow = upFlow;

            this.downFlow = downFlow;

        }

        @Override

        public void write(DataOutput out) throws IOException {

            out.writeLong(upFlow);

            out.writeLong(downFlow);

            out.writeLong(sumFlow);

        }

        @Override

        public void readFields(DataInput in) throws IOException {

            upFlow = in.readLong();

            downFlow = in.readLong();

            sumFlow = in.readLong();

        }

        public long getUpFlow() {

            return upFlow;

        }

        public void setUpFlow(long upFlow) {

            this.upFlow = upFlow;

        }

        public long getDownFlow() {

            return downFlow;

        }

        public void setDownFlow(long downFlow) {

            this.downFlow = downFlow;

        }

        public long getSumFlow() {

            return sumFlow;

        }

        public void setSumFlow(long sumFlow) {

            this.sumFlow = sumFlow;

        }

        @Override

        public String toString() {

            return upFlow + "\t" + downFlow + "\t" + sumFlow;

        }

        public void set(long downFlow2, long upFlow2) {

            downFlow = downFlow2;

            upFlow = upFlow2;

            sumFlow = downFlow2 + upFlow2;

        }

        @Override

        public int compareTo(FlowBean bean) {

            int result;

            if (this.sumFlow > bean.getSumFlow()) {

                result = -1;

            }else if (this.sumFlow < bean.getSumFlow()) {

                result = 1;

            }else {

                result = 0;

            }

            return result;

        }

    }

    TopNMapper

    public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

        // 定义一个TreeMap作为存储数据的容器(天然按key排序)

        private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

        private FlowBean kBean;

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            kBean = new FlowBean();

            Text v = new Text();

            // 1 获取一行

            String line = value.toString();

            // 2 切割

            String[] fields = line.split("\t");

            // 3 封装数据

            String phoneNum = fields[0];

            long upFlow = Long.parseLong(fields[1]);

            long downFlow = Long.parseLong(fields[2]);

            long sumFlow = Long.parseLong(fields[3]);

            kBean.setDownFlow(downFlow);

            kBean.setUpFlow(upFlow);

            kBean.setSumFlow(sumFlow);

            v.set(phoneNum);

            // 4 向TreeMap中添加数据

            flowMap.put(kBean, v);

            // 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据

            if (flowMap.size() > 10) {

    // flowMap.remove(flowMap.firstKey());

                flowMap.remove(flowMap.lastKey());

            }

        }

        @Override

        protected void cleanup(Context context) throws IOException, InterruptedException {

            // 6 遍历treeMap集合,输出数据

            Iterator<FlowBean> bean = flowMap.keySet().iterator();

            while (bean.hasNext()) {

                FlowBean k = bean.next();

                context.write(k, flowMap.get(k));

            }

        }

    }

    TopNReducer

    public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

        // 定义一个TreeMap作为存储数据的容器(天然按key排序)

        TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

        @Override

        protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

            for (Text value : values) {

                FlowBean bean = new FlowBean();

                bean.set(key.getDownFlow(), key.getUpFlow());

                // 1 向treeMap集合中添加数据

                flowMap.put(bean, new Text(value));

                // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据

                if (flowMap.size() > 10) {

                    // flowMap.remove(flowMap.firstKey());

                    flowMap.remove(flowMap.lastKey());

                }

            }

        }

        @Override

        protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

            // 3 遍历集合,输出数据

            Iterator<FlowBean> it = flowMap.keySet().iterator();

            while (it.hasNext()) {

                FlowBean v = it.next();

                context.write(new Text(flowMap.get(v)), v);

            }

        }

    }

    TopNDriver

    public class TopNDriver {

        public static void main(String[] args) throws Exception {

            args  = new String[]{"e:/output1","e:/output3"};

            // 1 获取配置信息,或者job对象实例

            Configuration configuration = new Configuration();

            Job job = Job.getInstance(configuration);

            // 6 指定本程序的jar包所在的本地路径

            job.setJarByClass(TopNDriver.class);

            // 2 指定本业务job要使用的mapper/Reducer业务类

            job.setMapperClass(TopNMapper.class);

            job.setReducerClass(TopNReducer.class);

            // 3 指定mapper输出数据的kv类型

            job.setMapOutputKeyClass(FlowBean.class);

            job.setMapOutputValueClass(Text.class);

            // 4 指定最终输出的数据的kv类型

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(FlowBean.class);

            // 5 指定job的输入原始文件所在目录

            FileInputFormat.setInputPaths(job, new Path(args[0]));

            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行

            boolean result = job.waitForCompletion(true);

            System.exit(result ? 0 : 1);

        }

    }

    相关文章

      网友评论

          本文标题:TopN案例

          本文链接:https://www.haomeiwen.com/subject/owqepqtx.html