美文网首页
实现流量汇总排序

实现流量汇总排序

作者: 小月半会飞 | 来源:发表于2019-01-02 14:17 被阅读0次

要求:将手机流量汇总,并且按照总流量的使用从大到小排序

第一步,将手机以及流量使用个情况汇总:

FlowBean代码:
package com.neusoft;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by Administrator on 2018/12/29.
 */
//    通过实现WritableComparable<FlowBean>,对对象进行排序
public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long totalFlow;

    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }

    @Override
    public String toString() {
        return upFlow +
                "\t" + downFlow +
                "\t" + totalFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getTotalFlow() {
        return totalFlow;
    }

    public void setTotalFlow(long totalFlow) {
        this.totalFlow = totalFlow;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(totalFlow);
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);

    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        totalFlow = dataInput.readLong();
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
    }
//    这一部分代码,是指定对象排序时,按照这一属性排序
//返回负数,表示当前值小于传过来的值,返回正数,表示当前值大于传过来的值,返回0表示两个值相等
    @Override
    public int compareTo(FlowBean o) {
        return (int)(o.totalFlow - this.totalFlow);
    }
}

FlowMapper代码:
package com.neusoft;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Created by Administrator on 2018/12/29.
 */
public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        String[] words = value.toString().split("\\pP|\\s+");
        String[] words = value.toString().split("\\s+");
        String phoneNo = words[1];
        String upFlow = words[words.length - 3];
        String downFlow = words[words.length - 2];
        FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
        context.write(new Text(phoneNo), flowBean);

    }
}
FlowReducer代码:
package com.neusoft;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Administrator on 2018/12/29.
 */
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean>
{
//    <13560439658,List(flowbean1,flowbean2)>
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        System.out.println(key);
        long sumUpFlow = 0;
        long sumDownFlow = 0;
        for(FlowBean flowBean : values)
        {
            sumDownFlow += flowBean.getDownFlow();
            sumUpFlow += flowBean.getUpFlow();
        }

        FlowBean finalFlowBean = new FlowBean(sumDownFlow,sumUpFlow);
        context.write(key,finalFlowBean);

    }
}
FlowDriver代码:
package com.neusoft;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 相当于一个yarn集群的客户端
 * 需要在此封装我们的mr程序的相关运行参数,指定jar包
 * 最后提交给yarn
 */
public class FlowDriver {

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root") ;
        System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
        if (args == null || args.length == 0) {
            return;
        }

        FileUtil.deleteDir(args[1]);


        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        //jar
        job.setJarByClass(FlowDriver.class);


        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);


        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean bResult = job.waitForCompletion(true);
        System.out.println("--------------------------------");
        System.exit(bResult ? 0 : 1);

    }
}

程序运行结果如下:

13480253104 180 180 360
13502468823 110349  7335    117684
13560436666 954 1116    2070
13560439658 5892    2034    7926
13602846565 2910    1938    4848
13660577991 690 6960    7650
13719199419 0   240 240
13726230503 24681   2481    27162
13726238888 24681   2481    27162
13760778710 120 120 240
13826544101 0   264 264
13922314466 3720    3008    6728
13925057413 48243   11058   59301
13926251106 0   240 240
13926435656 1512    132 1644
15013685858 3538    3659    7197
15920133257 2936    3156    6092
15989002119 180 1938    2118
18211575961 2106    1527    3633
18320173382 2412    9531    11943
84138413    1432    4116    5548

第二部,将汇总好的文件按流量的使用情况进行排序

前面我们已经将bean所需要依据的属性指定好了,现在可以直接使用这个bean

SortFlowMapper:
package com.neusoft;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Created by Administrator on 2018/12/29.
 */
public class SortFlowMapper extends Mapper<LongWritable,Text,FlowBean,Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split("\\s+");
        String phoneNo = words[0];
        String upFlow = words[1];
        String downFlow = words[2];
        FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
        context.write(flowBean,new Text(phoneNo));
    }
}
SortFlowReducer:
package com.neusoft;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Administrator on 2018/12/29.
 */
public class SortFlowReducer extends Reducer<FlowBean,Text,Text,FlowBean>
{
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//     这里reducer有自己的排序,会将输出结果按照返回值的正负来排序
        context.write(values.iterator().next(),key);
    }
}
SortFlowDriver:
package com.neusoft;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 相当于一个yarn集群的客户端
 * 需要在此封装我们的mr程序的相关运行参数,指定jar包
 * 最后提交给yarn
 */
public class SortFlowDriver {

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root") ;
        System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
        if (args == null || args.length == 0) {
            return;
        }

        FileUtil.deleteDir(args[1]);


        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        //jar
        job.setJarByClass(SortFlowDriver.class);


        job.setMapperClass(SortFlowMapper.class);
        job.setReducerClass(SortFlowReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        /*job.setNumReduceTasks(5);
        job.setPartitionerClass(FlowPartitioner.class);*/

        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        boolean bResult = job.waitForCompletion(true);
        System.out.println("--------------------------------");
        System.exit(bResult ? 0 : 1);

    }
}

结果:

13502468823 110349  7335    117684
13925057413 48243   11058   59301
13726230503 24681   2481    27162
18320173382 2412    9531    11943
13560439658 5892    2034    7926
13660577991 690 6960    7650
15013685858 3538    3659    7197
13922314466 3720    3008    6728
15920133257 2936    3156    6092
84138413    1432    4116    5548
13602846565 2910    1938    4848
18211575961 2106    1527    3633
15989002119 180 1938    2118
13560436666 954 1116    2070
13926435656 1512    132 1644
13480253104 180 180 360
13826544101 0   264 264
13719199419 0   240 240

相关文章

  • 实现流量汇总排序

    要求:将手机流量汇总,并且按照总流量的使用从大到小排序 第一步,将手机以及流量使用个情况汇总: FlowBean代...

  • mapreduce实现流量汇总排序程序

    流量汇总程序开发,利用生成好的汇总过的文件接着来进行按照总流量由高到低排序。 因为maptask的最终生成文件中的...

  • mapreduce实现流量汇总排序程序

    流量汇总程序开发,利用生成好的汇总过的文件接着来进行按照总流量由高到低排序。 因为maptask的最终生成文件中的...

  • mapreduce实现流量汇总排序程序

    流量汇总程序开发,利用生成好的汇总过的文件接着来进行按照总流量由高到低排序。 因为maptask的最终生成文件中的...

  • 数据结构与算法

    常见排序算法 堆排序 算法大全 算法大汇总

  • Java 实现汇总排序

    排序在系统中经常能用到,一般可以在数据库做排序,也可以在服务端做排序。在数据库一般使用 order by 排序。而...

  • 数据处理|R-dplyr

    dplyr包实现数据的清洗处理,包括数据整合、关联、排序、筛选、汇总、分组等。 1)安装、加载dplyr包、准备数...

  • 16、分类汇总和数据有效性

    分类汇总: 按什么分类 把什么汇总 做什么运算 使用分类汇总前必须先进行排序:...

  • 简单排序

    1、选择排序 实现 2、冒泡排序 实现 3、插入排序 实现

  • 学习office——Excel 分类汇总和数据有效性

    一、分类汇总工具 1、认识分类汇总 2、使用分列汇总【先排序!】 删除分类选择【全部删除】即可 3、分类汇总的嵌套...

网友评论

      本文标题:实现流量汇总排序

      本文链接:https://www.haomeiwen.com/subject/dedzlqtx.html