美文网首页
实现流量汇总排序

实现流量汇总排序

作者: 小月半会飞 | 来源:发表于2019-01-02 14:17 被阅读0次

    要求:将手机流量汇总,并且按照总流量的使用从大到小排序

    第一步,将手机以及流量使用个情况汇总:

    FlowBean代码:
    package com.neusoft;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/12/29.
     */
    //    通过实现WritableComparable<FlowBean>,对对象进行排序
    public class FlowBean implements WritableComparable<FlowBean> {
        private long upFlow;
        private long downFlow;
        private long totalFlow;
    
        public FlowBean() {
        }
    
        public FlowBean(long upFlow, long downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.totalFlow = upFlow + downFlow;
        }
    
        @Override
        public String toString() {
            return upFlow +
                    "\t" + downFlow +
                    "\t" + totalFlow;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getTotalFlow() {
            return totalFlow;
        }
    
        public void setTotalFlow(long totalFlow) {
            this.totalFlow = totalFlow;
        }
    
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(totalFlow);
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(downFlow);
    
        }
    
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            totalFlow = dataInput.readLong();
            upFlow = dataInput.readLong();
            downFlow = dataInput.readLong();
        }
    //    这一部分代码,是指定对象排序时,按照这一属性排序
    //返回负数,表示当前值小于传过来的值,返回正数,表示当前值大于传过来的值,返回0表示两个值相等
        @Override
        public int compareTo(FlowBean o) {
            return (int)(o.totalFlow - this.totalFlow);
        }
    }
    
    
    FlowMapper代码:
    package com.neusoft;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/12/29.
     */
    public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //        String[] words = value.toString().split("\\pP|\\s+");
            String[] words = value.toString().split("\\s+");
            String phoneNo = words[1];
            String upFlow = words[words.length - 3];
            String downFlow = words[words.length - 2];
            FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
            context.write(new Text(phoneNo), flowBean);
    
        }
    }
    
    FlowReducer代码:
    package com.neusoft;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/12/29.
     */
    public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean>
    {
    //    <13560439658,List(flowbean1,flowbean2)>
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            System.out.println(key);
            long sumUpFlow = 0;
            long sumDownFlow = 0;
            for(FlowBean flowBean : values)
            {
                sumDownFlow += flowBean.getDownFlow();
                sumUpFlow += flowBean.getUpFlow();
            }
    
            FlowBean finalFlowBean = new FlowBean(sumDownFlow,sumUpFlow);
            context.write(key,finalFlowBean);
    
        }
    }
    
    FlowDriver代码:
    package com.neusoft;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 相当于一个yarn集群的客户端
     * 需要在此封装我们的mr程序的相关运行参数,指定jar包
     * 最后提交给yarn
     */
    public class FlowDriver {
    
        public static void main(String[] args) throws Exception {
            System.setProperty("HADOOP_USER_NAME", "root") ;
            System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
            if (args == null || args.length == 0) {
                return;
            }
    
            FileUtil.deleteDir(args[1]);
    
    
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            //jar
            job.setJarByClass(FlowDriver.class);
    
    
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
    
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            boolean bResult = job.waitForCompletion(true);
            System.out.println("--------------------------------");
            System.exit(bResult ? 0 : 1);
    
        }
    }
    

    程序运行结果如下:

    13480253104 180 180 360
    13502468823 110349  7335    117684
    13560436666 954 1116    2070
    13560439658 5892    2034    7926
    13602846565 2910    1938    4848
    13660577991 690 6960    7650
    13719199419 0   240 240
    13726230503 24681   2481    27162
    13726238888 24681   2481    27162
    13760778710 120 120 240
    13826544101 0   264 264
    13922314466 3720    3008    6728
    13925057413 48243   11058   59301
    13926251106 0   240 240
    13926435656 1512    132 1644
    15013685858 3538    3659    7197
    15920133257 2936    3156    6092
    15989002119 180 1938    2118
    18211575961 2106    1527    3633
    18320173382 2412    9531    11943
    84138413    1432    4116    5548
    

    第二部,将汇总好的文件按流量的使用情况进行排序

    前面我们已经将bean所需要依据的属性指定好了,现在可以直接使用这个bean

    SortFlowMapper:
    package com.neusoft;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/12/29.
     */
    public class SortFlowMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split("\\s+");
            String phoneNo = words[0];
            String upFlow = words[1];
            String downFlow = words[2];
            FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
            context.write(flowBean,new Text(phoneNo));
        }
    }
    
    SortFlowReducer:
    package com.neusoft;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/12/29.
     */
    public class SortFlowReducer extends Reducer<FlowBean,Text,Text,FlowBean>
    {
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    //     这里reducer有自己的排序,会将输出结果按照返回值的正负来排序
            context.write(values.iterator().next(),key);
        }
    }
    
    SortFlowDriver:
    package com.neusoft;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 相当于一个yarn集群的客户端
     * 需要在此封装我们的mr程序的相关运行参数,指定jar包
     * 最后提交给yarn
     */
    public class SortFlowDriver {
    
        public static void main(String[] args) throws Exception {
            System.setProperty("HADOOP_USER_NAME", "root") ;
            System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
            if (args == null || args.length == 0) {
                return;
            }
    
            FileUtil.deleteDir(args[1]);
    
    
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            //jar
            job.setJarByClass(SortFlowDriver.class);
    
    
            job.setMapperClass(SortFlowMapper.class);
            job.setReducerClass(SortFlowReducer.class);
    
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            /*job.setNumReduceTasks(5);
            job.setPartitionerClass(FlowPartitioner.class);*/
    
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            boolean bResult = job.waitForCompletion(true);
            System.out.println("--------------------------------");
            System.exit(bResult ? 0 : 1);
    
        }
    }
    

    结果:

    13502468823 110349  7335    117684
    13925057413 48243   11058   59301
    13726230503 24681   2481    27162
    18320173382 2412    9531    11943
    13560439658 5892    2034    7926
    13660577991 690 6960    7650
    15013685858 3538    3659    7197
    13922314466 3720    3008    6728
    15920133257 2936    3156    6092
    84138413    1432    4116    5548
    13602846565 2910    1938    4848
    18211575961 2106    1527    3633
    15989002119 180 1938    2118
    13560436666 954 1116    2070
    13926435656 1512    132 1644
    13480253104 180 180 360
    13826544101 0   264 264
    13719199419 0   240 240
    
    

    相关文章

      网友评论

          本文标题:实现流量汇总排序

          本文链接:https://www.haomeiwen.com/subject/dedzlqtx.html