美文网首页
序列化案例实操

序列化案例实操

作者: bullion | 来源:发表于2019-03-09 16:54 被阅读0次

    需求


    需求分析

    FlowBean

    // 1 实现writable接口

    public class FlowBean implements Writable {

        private long upFlow;

        private long downFlow;

        private long sumFlow;

        //2  反序列化时,需要反射调用空参构造函数,所以必须有

        public FlowBean() {

            super();

        }

        public FlowBean(long upFlow, long downFlow) {

            super();

            this.upFlow = upFlow;

            this.downFlow = downFlow;

            this.sumFlow = upFlow + downFlow;

        }

        //3  写序列化方法

        @Override

        public void write(DataOutput out) throws IOException {

            out.writeLong(upFlow);

            out.writeLong(downFlow);

            out.writeLong(sumFlow);

        }

        //4 反序列化方法

        //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致

        @Override

        public void readFields(DataInput in) throws IOException {

            this.upFlow = in.readLong();

            this.downFlow = in.readLong();

            this.sumFlow = in.readLong();

        }

        // 6 编写toString方法,方便后续打印到文本

        @Override

        public String toString() {

            return upFlow + "\t" + downFlow + "\t" + sumFlow;

        }

        public long getUpFlow() {

            return upFlow;

        }

        public void setUpFlow(long upFlow) {

            this.upFlow = upFlow;

        }

        public long getDownFlow() {

            return downFlow;

        }

        public void setDownFlow(long downFlow) {

            this.downFlow = downFlow;

        }

        public long getSumFlow() {

            return sumFlow;

        }

        public void setSumFlow(long sumFlow) {

            this.sumFlow = sumFlow;

        }

    }

    FlowCountMapper

    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

        FlowBean v = new FlowBean();

        Text k = new Text();

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 1 获取一行

            String line = value.toString();

            // 2 切割字段

            String[] fields = line.split("\t");

            // 3 封装对象

            // 取出手机号码

            String phoneNum = fields[1];

            // 取出上行流量和下行流量

            long upFlow = Long.parseLong(fields[fields.length - 3]);

            long downFlow = Long.parseLong(fields[fields.length - 2]);

            k.set(phoneNum);

            v.setDownFlow(downFlow);

            v.setUpFlow(upFlow);

            // 4 写出

            context.write(k, v);

        }

    }

    FlowCountReducer

    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

        @Override

        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

            long sum_upFlow = 0;

            long sum_downFlow = 0;

            // 1 遍历所用bean,将其中的上行流量,下行流量分别累加

            for (FlowBean flowBean : values) {

                sum_upFlow += flowBean.getUpFlow();

                sum_downFlow += flowBean.getDownFlow();

            }

            // 2 封装对象

            FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

            // 3 写出

            context.write(key, resultBean);

        }

    }

    FlowsumDriver

    public class FlowsumDriver {

        public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

            // 输入输出路径需要根据自己电脑上实际的输入输出路径设置

            args = new String[]{"e:/input/inputflow", "e:/output1"};

            // 1 获取配置信息,或者job对象实例

            Configuration configuration = new Configuration();

            Job job = Job.getInstance(configuration);

            // 6 指定本程序的jar包所在的本地路径

            job.setJarByClass(FlowsumDriver.class);

            // 2 指定本业务job要使用的mapper/Reducer业务类

            job.setMapperClass(FlowCountMapper.class);

            job.setReducerClass(FlowCountReducer.class);

            // 3 指定mapper输出数据的kv类型

            job.setMapOutputKeyClass(Text.class);

            job.setMapOutputValueClass(FlowBean.class);

            // 4 指定最终输出的数据的kv类型

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(FlowBean.class);

            // 5 指定job的输入原始文件所在目录

            FileInputFormat.setInputPaths(job, new Path(args[0]));

            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行

            boolean result = job.waitForCompletion(true);

            System.exit(result ? 0 : 1);

        }

    }

    相关文章

      网友评论

          本文标题:序列化案例实操

          本文链接:https://www.haomeiwen.com/subject/dvfepqtx.html