美文网首页
hadoop(6)序列化

hadoop(6)序列化

作者: Tomy_Jx_Li | 来源:发表于2018-10-15 21:27 被阅读5次

    1 hadoop自己的序列化

    因为java中的序列化有太多的冗余信息,所以hadoop采用了自己的序列化机制。

    2 hadoop实现

    实现hadoop的序列化只需要实现接口org.apache.hadoop.io.Writable,然后重写两个方法。
    如:

    package com.jiyx.test.mapred.flowStatistics.bo;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * @author jiyx
     * @create 2018-10-15-19:22
     */
    public class DataBean implements Writable {
    
        private long phoneNum;
    
        private long upFlow;
    
        private long downFlow;
    
        private long totalFlow;
    
        public DataBean() {
        }
    
        public DataBean(long phoneNum, long upFlow, long downFlow) {
            this.phoneNum = phoneNum;
            this.downFlow = downFlow;
            this.upFlow = upFlow;
            this.totalFlow = upFlow + downFlow;
        }
    
        /**
         * 序列化
         * @param dataOutput
         * @throws IOException
         */
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(phoneNum);
            dataOutput.writeLong(upFlow);
            dataOutput.writeLong(downFlow);
            dataOutput.writeLong(totalFlow);
        }
    
        /**
         * 反序列化
         * @param dataInput
         * @throws IOException
         */
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            phoneNum = dataInput.readLong();
            upFlow = dataInput.readLong();
            downFlow = dataInput.readLong();
            totalFlow = dataInput.readLong();
        }
    
        /**
         * 重写toString主要是为了后面的写入文件
         * @return
         */
        @Override
        public String toString() {
            return this.upFlow + "\t" + this.downFlow + "\t" + this.totalFlow;
        }
    
        public long getPhoneNum() {
            return phoneNum;
        }
    
        public void setPhoneNum(long phoneNum) {
            this.phoneNum = phoneNum;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getTotalFlow() {
            return totalFlow;
        }
    
        public void setTotalFlow(long totalFlow) {
            this.totalFlow = totalFlow;
        }
    }
    
    package com.jiyx.test.mapred.flowStatistics;
    
    import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * Job
     * @author jiyx
     * @create 2018-10-15-19:21
     */
    public class FlowStatistics {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance();
    
            job.setJarByClass(FlowStatistics.class);
    
            job.setMapperClass(FlowStatisticsMapper.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            job.setReducerClass(FlowStatisticsReducer.class);
            // 这块需要注意的是自己踩了一个坑,就是将key和value整反了
            // 然后就会出现异常java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.jiyx.test.mapred.flowStatistics.bo.DataBean
            // 所以这里最好注意下
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(DataBean.class);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
    }
    
    package com.jiyx.test.mapred.flowStatistics;
    
    import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Map
     * @author jiyx
     * @create 2018-10-15-19:42
     */
    public class FlowStatisticsMapper extends Mapper<LongWritable, Text, LongWritable, DataBean> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] datas = value.toString().split("\t");
            long phoneNum = Long.parseLong(datas[1]);
            long upFlow = Long.parseLong(datas[8]);
            long downFlow = Long.parseLong(datas[9]);
            context.write(new LongWritable(phoneNum), new DataBean(phoneNum, upFlow, downFlow));
        }
    }
    
    package com.jiyx.test.mapred.flowStatistics;
    
    import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Reduce
     * @author jiyx
     * @create 2018-10-15-19:54
     */
    public class FlowStatisticsReducer extends Reducer<LongWritable, DataBean, LongWritable, DataBean> {
        @Override
        protected void reduce(LongWritable key, Iterable<DataBean> values, Context context) throws IOException, InterruptedException {
            long upFlowSum = 0;
            long downFlowSum = 0;
            for (DataBean value : values) {
                upFlowSum += value.getUpFlow();
                downFlowSum += value.getDownFlow();
            }
            context.write(key, new DataBean(key.get(), upFlowSum, downFlowSum));
        }
    }
    

    相关文章

      网友评论

          本文标题:hadoop(6)序列化

          本文链接:https://www.haomeiwen.com/subject/yulqzftx.html