美文网首页
Hadoop实现自定义对象序列化

Hadoop实现自定义对象序列化

作者: 总有人被感动 | 来源:发表于2018-08-01 10:00 被阅读0次

1.需求
记录用户的流量使用情况
电话号码 上行 下行
13726230501 200 1100
13396230502 300 1200
13897230503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200
整合号码相同的情况,输出格式为
<key,value>=<phoneNum,<upFlowSum,downFlowSum,Sum>>
所以这里需要自定义一个数据类型,保存<upFlowSum,downFlowSum,Sum>
2.代码实现
构造新的数据类型,并支持序列化,BeanFlow.java
```
public class FlowBean implements Writable {

private long upFlow;
private long downFlow;
private long sumFlow;

FlowBean(){
    
}

FlowBean(long upFlow,long downFlow){
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    this.sumFlow = upFlow+downFlow;
}

public long getUpFlow() {
    return upFlow;
}

public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
}

public long getDownFlow() {
    return downFlow;
}

public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
}

public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
    
}

public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
    
}

public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

}
```
如果要支持序列化的话,需要实现Writable类,并且重写write()和readFileds()这两个方法。注意读和些的数据类型要一致,且要注意顺序。

实现MapReduce类

public class FlowCount {
    static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
    //参数说明:输入的key的类型为LongWritable(这里为phoneNum)
    //                 value的类型为Text
    //         输出的key为Text
    //               value为FlowBean对象
        
        protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,FlowBean>.Context context)
                                  throws IOException,InterruptedException{
            String line = value.toString();
            
            String[] fields = line.split("\t");
            
            String phoneNum = fields[0];
            
            long upFlow = Long.parseLong(fields[1]);
            long downFlow = Long.parseLong(fields[2]);
            
            context.write(new Text(phoneNum), new FlowBean(upFlow,downFlow));
            
        }
    }
        
        static class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{
            
            protected void reduce(Text key,Iterable<FlowBean> values, Reducer<Text,FlowBean,Text,FlowBean>
                                .Context context) throws IOException,InterruptedException{
                long sum_upFlow = 0;
                long sum_dFlow = 0;
                
                for(FlowBean bean : values) {
                    sum_upFlow += bean.getUpFlow();
                    sum_dFlow+=bean.getDownFlow();
                }
                
                FlowBean resultBean = new FlowBean(sum_upFlow,sum_dFlow);
                context.write(key, resultBean);;
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();;
            Job job = Job.getInstance(conf,"flow count");
    
            job.setJarByClass(FlowCount.class);
    
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);

            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
            }
}

这里说明一下,context这个参数相当与是map和reduce中间的桥梁,所以不论是map还是reduce函数,都会有这个参数。reduce函数里面要传一个iterater的参数,原因是在经过map和shuffle阶段之后,输入的值会变成<Ukey,<vaue1,value2,value3...>>,即相同的key的value被整合到了一起,所以处理的时候要遍历相同key的所有value。

相关文章

  • Hadoop实现自定义对象序列化

    1.需求记录用户的流量使用情况电话号码 上行 下行13726230501 200 11001...

  • Hadoop序列化-自定义Bean对象

    自定义bean对象实现序列化接口(Writable)1.在企业开发中往往常用的基本序列化类型不能满足所有需求,比如...

  • NSCoding协议

    什么叫对象序列化? 对象序列化就是把对象写入到输出流中,用来存储或者传输,如果不序列化,(自定义对象是无法存储的)...

  • 大数据技术之MapReduce(二)

    Hadoop 序列化 2.1 序列化概述 1) 什么是序列化 序列化就是把内存中的对象,转换成字节序列(或其他数据...

  • Java常用序列化方式

    常用序列化方式 jdk自带对象序列化 对象需要实现Serializable接口 通过ObjectOutputStr...

  • hadoop序列化和反序列化

    hadoop序列化和反序列化 1 什么是序列化和反序列化 序列化就是将内存中的对象或数据,转换成字节数组,以便于存...

  • iOS应用数据储存学习-归档

    一、简单说明 归档:是指另一中形式的序列化,他是任何对象都可以实现的更常规的类型,归档可以实现把自定义的对象放在文...

  • 大话java面试-java基础-4

    1、序列化和反序列化的底层实现 序列化是指把对象转化成字节流,反序列化是指把字节流转化成对象。 实现了Serial...

  • Xstream反序列化漏洞

    1. Xtream实现序列化 XStream是Java类库,可以将对象序列化为XML格式或将XML反序列化为对象。...

  • 序列化和反序列化对象

    被序列化的对象需要实现Serializable接口例子: 序列化和反序列化操作 结果:

网友评论

      本文标题:Hadoop实现自定义对象序列化

      本文链接:https://www.haomeiwen.com/subject/zcllvftx.html