1.需求
记录用户的流量使用情况
电话号码 上行 下行
13726230501 200 1100
13396230502 300 1200
13897230503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200
整合号码相同的情况,输出格式为
<key,value>=<phoneNum,<upFlowSum,downFlowSum,Sum>>
所以这里需要自定义一个数据类型,保存<upFlowSum,downFlowSum,Sum>
2.代码实现
构造新的数据类型,并支持序列化,BeanFlow.java
```
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
FlowBean(){
}
FlowBean(long upFlow,long downFlow){
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
```
如果要支持序列化的话,需要实现Writable类,并且重写write()和readFileds()这两个方法。注意读和些的数据类型要一致,且要注意顺序。
实现MapReduce类
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
//参数说明:输入的key的类型为LongWritable(这里为phoneNum)
// value的类型为Text
// 输出的key为Text
// value为FlowBean对象
protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,FlowBean>.Context context)
throws IOException,InterruptedException{
String line = value.toString();
String[] fields = line.split("\t");
String phoneNum = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
context.write(new Text(phoneNum), new FlowBean(upFlow,downFlow));
}
}
static class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{
protected void reduce(Text key,Iterable<FlowBean> values, Reducer<Text,FlowBean,Text,FlowBean>
.Context context) throws IOException,InterruptedException{
long sum_upFlow = 0;
long sum_dFlow = 0;
for(FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_dFlow+=bean.getDownFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow,sum_dFlow);
context.write(key, resultBean);;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();;
Job job = Job.getInstance(conf,"flow count");
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
这里说明一下,context这个参数相当与是map和reduce中间的桥梁,所以不论是map还是reduce函数,都会有这个参数。reduce函数里面要传一个iterater的参数,原因是在经过map和shuffle阶段之后,输入的值会变成<Ukey,<vaue1,value2,value3...>>,即相同的key的value被整合到了一起,所以处理的时候要遍历相同key的所有value。
网友评论