美文网首页
统计手机用户流量日志

统计手机用户流量日志

作者: Sophie12138 | 来源:发表于2019-06-25 16:01 被阅读0次
    目录
    前言
    单词统计
    统计手机用户流量日志
    即将开始...

    统计手机用户流量日志需求分析

    需要统计手机用户流量日志,日志内容实例:

    要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

    例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

    手机号 上行流量 下行流量 总流量
    13897230503 500 1600 2100

    实现思路

    • map

    接收日志的一行数据,key为行的偏移量,value为此行数据。

    输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。

    手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。

    key: 13897230503

    value: < upFlow:100, dFlow:300, sumFlow:400 >

    • reduce

    接收一个手机号标识的key,及这个手机号对应的bean对象集合。

    例如:

    key:

    13897230503

    value:

    < upFlow:400, dFlow:1300, sumFlow:1700 >,

    < upFlow:100, dFlow:300, sumFlow:400 >

    迭代bean对象集合,累加各项,形成一个新的bean对象,例如:

    < upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >

    最后输出:

    key: 13897230503

    value: < upFlow:500, dFlow:1600, sumFlow:2100 >

    代码实践

    • 项目结构


    • pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>hadoop</groupId>
      <artifactId>hadoop</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <name>hadoop</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>commons-beanutils</groupId>
          <artifactId>commons-beanutils</artifactId>
          <version>1.9.3</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.7.3</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>2.7.3</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-common</artifactId>
          <version>2.7.3</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>2.7.3</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
    
    • FlowBean.java
    public class FlowBean implements Writable {
        private long upFlow;
        private long dFlow;
        private long sumFlow;
    
        public FlowBean() {
        }
    
        public FlowBean(long upFlow, long dFlow) {
            this.upFlow = upFlow;
            this.dFlow = dFlow;
            this.sumFlow = upFlow + dFlow;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getdFlow() {
            return dFlow;
        }
    
        public void setdFlow(long dFlow) {
            this.dFlow = dFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(dFlow);
            dataOutput.writeLong(sumFlow);
        }
    
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            upFlow = dataInput.readLong();
            dFlow = dataInput.readLong();
            sumFlow = dataInput.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + "\t" + dFlow + "\t" + sumFlow;
        }
    }
    
    • FlowCount.java
    public class FlowCount {
        static class FlowCountMapper extends Mapper {
            @Override
            protected void map(Object key, Object value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("\t");
                String phoneNbr = fields[0];
                long upFlow = Long.parseLong(fields[1]);
                long dFlow = Long.parseLong(fields[2]);
                context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));
            }
        }
    
        static class FlowCountReducer extends Reducer {
            @Override
            protected void reduce(Object key, Iterable values, Context context)
                    throws IOException, InterruptedException {
                long sumUpFlow = 0;
                long sumDownFlow = 0;
                Iterator<FlowBean> iterator = values.iterator();
                while (iterator.hasNext()) {
                    FlowBean flowBean = iterator.next();
                    sumUpFlow += flowBean.getUpFlow();
                    sumDownFlow += flowBean.getdFlow();
                }
    
                FlowBean flowBean = new FlowBean(sumUpFlow, sumDownFlow);
                context.write(key, flowBean);
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(FlowCount.class);
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            boolean result = job.waitForCompletion(true);
            if (!result) {
                System.out.println("Task fail!");
            }
        }
    }
    

    运行

    • 将flowdata.log上传到hdfs
    13726230501     200     1100
    13396230502     300     1200
    13897230503     400     1300
    13897230503     100     300
    13597230534     500     1400
    13597230534     300     1200
    
    • 运行
      hadoop jar hadoop-1.0-SNAPSHOT.jar flowcount/FlowCount /flowcount/input /flowcount/output

    • 查看结果


    扩展

    现在会把不同省份的号码段统计在一起,例如137表示属于河北,138属于河南。

    现在的代码需要在原来的代码上新增两步。

    • 自定义一个分区器Partitioner
    • 在main程序中指定使用我们自定义的Partitioner
      ProvincePartitioner.java
    public class ProvincePartitioner extends Partitioner {
        private static HashMap<String, Integer> provinceDict = new HashMap<>();
        static {
            provinceDict.put("137", 0);
            provinceDict.put("133", 1);
            provinceDict.put("138", 2);
            provinceDict.put("135", 3);
        }
        @Override
        public int getPartition(Object o, Object o2, int i) {
            String prefix = o.toString().substring(0, 3);
            Integer provinceId = provinceDict.get(prefix);
            return provinceId == null ? 4 : provinceId;
        }
    }
    

    FlowCount.java

    public class FlowCount {
        static class FlowCountMapper extends Mapper {
            @Override
            protected void map(Object key, Object value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("\t");
                String phoneNbr = fields[0];
                long upFlow = Long.parseLong(fields[1]);
                long dFlow = Long.parseLong(fields[2]);
                context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));
            }
        }
    
        static class FlowCountReducer extends Reducer {
            @Override
            protected void reduce(Object key, Iterable values, Context context)
                    throws IOException, InterruptedException {
                long sumUpFlow = 0;
                long sumDownFlow = 0;
                Iterator<FlowBean> iterator = values.iterator();
                while (iterator.hasNext()) {
                    FlowBean flowBean = iterator.next();
                    sumUpFlow += flowBean.getUpFlow();
                    sumDownFlow += flowBean.getdFlow();
                }
    
                FlowBean flowBean = new FlowBean(sumUpFlow, sumDownFlow);
                context.write(key, flowBean);
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(FlowCount.class);
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
    
            job.setPartitionerClass(ProvincePartitioner.class);
            job.setNumReduceTasks(5);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            boolean result = job.waitForCompletion(true);
            if (!result) {
                System.out.println("Task fail!");
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:统计手机用户流量日志

          本文链接:https://www.haomeiwen.com/subject/njaiqctx.html