目录 |
---|
前言 |
单词统计 |
统计手机用户流量日志 |
即将开始... |
统计手机用户流量日志需求分析
需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:
手机号 | 上行流量 | 下行流量 | 总流量 |
---|---|---|---|
13897230503 | 500 | 1600 | 2100 |
实现思路
- map
接收日志的一行数据,key为行的偏移量,value为此行数据。
输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。
手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。
key: 13897230503
value: < upFlow:100, dFlow:300, sumFlow:400 >
- reduce
接收一个手机号标识的key,及这个手机号对应的bean对象集合。
例如:
key:
13897230503
value:
< upFlow:400, dFlow:1300, sumFlow:1700 >,
< upFlow:100, dFlow:300, sumFlow:400 >
迭代bean对象集合,累加各项,形成一个新的bean对象,例如:
< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
最后输出:
key: 13897230503
value: < upFlow:500, dFlow:1600, sumFlow:2100 >
代码实践
-
项目结构
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hadoop</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<name>hadoop</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
- FlowBean.java
public class FlowBean implements Writable {
private long upFlow;
private long dFlow;
private long sumFlow;
public FlowBean() {
}
public FlowBean(long upFlow, long dFlow) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getdFlow() {
return dFlow;
}
public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(dFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
dFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + dFlow + "\t" + sumFlow;
}
}
- FlowCount.java
public class FlowCount {
static class FlowCountMapper extends Mapper {
@Override
protected void map(Object key, Object value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phoneNbr = fields[0];
long upFlow = Long.parseLong(fields[1]);
long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));
}
}
static class FlowCountReducer extends Reducer {
@Override
protected void reduce(Object key, Iterable values, Context context)
throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;
Iterator<FlowBean> iterator = values.iterator();
while (iterator.hasNext()) {
FlowBean flowBean = iterator.next();
sumUpFlow += flowBean.getUpFlow();
sumDownFlow += flowBean.getdFlow();
}
FlowBean flowBean = new FlowBean(sumUpFlow, sumDownFlow);
context.write(key, flowBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
if (!result) {
System.out.println("Task fail!");
}
}
}
运行
- 将flowdata.log上传到hdfs
13726230501 200 1100
13396230502 300 1200
13897230503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200
-
运行
hadoop jar hadoop-1.0-SNAPSHOT.jar flowcount/FlowCount /flowcount/input /flowcount/output
-
查看结果
扩展
现在会把不同省份的号码段统计在一起,例如137表示属于河北,138属于河南。
现在的代码需要在原来的代码上新增两步。
- 自定义一个分区器Partitioner
- 在main程序中指定使用我们自定义的Partitioner
ProvincePartitioner.java
public class ProvincePartitioner extends Partitioner {
private static HashMap<String, Integer> provinceDict = new HashMap<>();
static {
provinceDict.put("137", 0);
provinceDict.put("133", 1);
provinceDict.put("138", 2);
provinceDict.put("135", 3);
}
@Override
public int getPartition(Object o, Object o2, int i) {
String prefix = o.toString().substring(0, 3);
Integer provinceId = provinceDict.get(prefix);
return provinceId == null ? 4 : provinceId;
}
}
FlowCount.java
public class FlowCount {
static class FlowCountMapper extends Mapper {
@Override
protected void map(Object key, Object value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phoneNbr = fields[0];
long upFlow = Long.parseLong(fields[1]);
long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));
}
}
static class FlowCountReducer extends Reducer {
@Override
protected void reduce(Object key, Iterable values, Context context)
throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;
Iterator<FlowBean> iterator = values.iterator();
while (iterator.hasNext()) {
FlowBean flowBean = iterator.next();
sumUpFlow += flowBean.getUpFlow();
sumDownFlow += flowBean.getdFlow();
}
FlowBean flowBean = new FlowBean(sumUpFlow, sumDownFlow);
context.write(key, flowBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
if (!result) {
System.out.println("Task fail!");
}
}
}
网友评论