美文网首页hadoop
MapReduce(二):MapReduce序列化

MapReduce(二):MapReduce序列化

作者: codeMover | 来源:发表于2021-12-08 23:32 被阅读0次

    序列化概述

    什么是序列化

    序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络。
    
    反序列化就是将字节序列(或其他数据传输协议)或是从磁盘持久化数据,转换成内存的对象。
    

    为什么要序列化

    一般来说,活的对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储或的对象,可以将或的对象发送到远程计算机。
    

    为什么不用Java序列化

    Java序列化是一个重量级序列化框架(Serizlizable),一个对象被序列化后,会发你很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。
    

    hadoop序列化优势

    • 结构紧凑,存储空间少
    • 传输快速
    • 互操作性,支持多语言使用

    自定义Bean对象实现序列化接口 WritableComparable

    企业开发中的基本类型不能满足所有需求。

    Java类型 Hadoop Writable类型
    Boolean BooleanWritable
    Byte ByteWritable
    Integer IntWritable
    Float FloatWritable
    Double DoubleWritable
    String Text
    Map MapWritable
    Array MapWritable
    Null NullWritable
    • 必须实现Writable接口
    • 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
    • 重写序列化方法
    • 重写反序列化方法
    • 反序列化的顺序和序列化的顺序完全一致
    • 要想把结果展示在文件中,需要重写toString(),可用"\t"分开,方便后续用
    • 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为MapReduce框架中的shuffle过程要求对key必须能排序。

    案例

    统计一个文件中的上行流量、下行流量以及总流量。

    FlowBean.java

    public class FlowBean implements Writable {
    
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        public FlowBean() {
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow = this.upFlow + this.downFlow;
        }
    
        @Override
        public String toString() {
            return upFlow +
                "\t" + downFlow +
                "\t" + sumFlow;
        }
    }
    

    FlowMapper.java

    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * @Description TODO
     * @Author magaowei
     * @Date 2021/11/21 11:38 下午
     * @Version 1.0
     */
    public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
        private Text outK = new Text();
        private FlowBean outV = new FlowBean();
        @Override
        protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
            // 1 获取一行
            String line = value.toString();
            // 2 切割
            String[] split = line.split(" ");
            System.out.println(split.length);
            // 3 抓取数据
            String phone = split[0];
            String upFlow = split[split.length-3];
            String downFlow = split[split.length-2];
            // 4 封装
            outK.set(phone);
            outV.setUpFlow(Long.parseLong(upFlow));
            outV.setDownFlow(Long.parseLong(downFlow));
            outV.setSumFlow();
    
            context.write(outK,outV);
        }
    }
    
    

    FlowReducer.java

    import java.io.IOException;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * @Description TODO
     * @Author magaowei
     * @Date 2021/11/21 11:47 下午
     * @Version 1.0
     */
    public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {
        private FlowBean outV = new FlowBean();
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values,
            Reducer<Text, FlowBean, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
            // 1 遍历集合类价值
            long totalUp = 0;
            long totalDown = 0;
            for (FlowBean value : values) {
                totalUp += value.getUpFlow();
                totalDown += value.getDownFlow();
            }
            // 3 封装
            outV.setUpFlow(totalUp);
            outV.setDownFlow(totalDown);
            outV.setSumFlow();
            // 4 写出
            context.write(key,outV);
    
        }
    }
    

    FlowDriver.java

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * @Description TODO
     * @Author magaowei
     * @Date 2021/11/21 11:51 下午
     * @Version 1.0
     */
    public class FlowDriver {
    
        public static void main(String[] args)
            throws IOException, InterruptedException, ClassNotFoundException {
            // 1 获取job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2 设置jar
            job.setJarByClass(FlowDriver.class);
            // 3 关联mapper和reducer
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
            // 4 设置mapper输出的key和value类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            // 5 设置最终输出的key和value类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            // 6 设置数据的输入路径和输出路径
            FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writable"));
            FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writable"));
            // 7 提交job
            Boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    小结

    本节复习了前一节hadoop类型,并和Java中原有数据类型做对比。Java(Serializable)序列化时一个重量级序列化框架,不便在网络传输;而hadoop(Writable)具有紧凑、快速等优势。最后通过案例实战方式统计输入数据。

    相关文章

      网友评论

        本文标题:MapReduce(二):MapReduce序列化

        本文链接:https://www.haomeiwen.com/subject/cwpxfrtx.html