美文网首页
GroupingComparator分组案例实操

GroupingComparator分组案例实操

作者: bullion | 来源:发表于2019-01-30 14:43 被阅读0次

OrderBean

public class OrderBean implements WritableComparable<OrderBean> {

    private int order_id;  //订单id

    private double price;  //价格

    public OrderBean() {

        super();

    }

    public OrderBean(int order_id, double price) {

        super();

        this.order_id = order_id;

        this.price = price;

    }

    @Override

    public int compareTo(OrderBean bean) {

        //先按照id升序排序,如果相同按照价格降序排序

        int result;

        if (order_id > bean.getOrder_id()) {

            result = 1;

        } else if (order_id < bean.getOrder_id()) {

            result = -1;

        } else {

            if (price > bean.getPrice()) {

                result = -1;

            } else if (price < bean.getPrice()) {

                result = 1;

            } else {

                result = 0;

            }

        }

        return result;

    }

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(order_id);

        out.writeDouble(price);

    }

    @Override

    public void readFields(DataInput in) throws IOException {

        order_id = in.readInt();

        price = in.readDouble();

    }

    public int getOrder_id() {

        return order_id;

    }

    public void setOrder_id(int order_id) {

        this.order_id = order_id;

    }

    public double getPrice() {

        return price;

    }

    public void setPrice(double price) {

        this.price = price;

    }

    @Override

    public String toString() {

        return order_id + "\t" + price;

    }

}

OrderMapper

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

    OrderBean k = new OrderBean();

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行

        String line = value.toString();

        // 2 切割

        String[] fields = line.split(" ");

        // 3 封装对象

        k.setOrder_id(Integer.parseInt(fields[0]));

        k.setPrice(Double.parseDouble(fields[2]));

        // 4 写出

        context.write(k, NullWritable.get());

    }

}

OrderReducer

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{

    @Override

    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        context.write(key, NullWritable.get());

    }

}

OrderDirver

public class OrderDirver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //输入输出路径需要根据自己电脑上的实际的输入输出路径设置

        args = new String[]{"e:/input/inputorder", "e:/output1"};

        // 1 获取配置信息

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        // 2 设置jar包加载路径

        job.setJarByClass(OrderDirver.class);

        // 3 加载map/reduce类

        job.setMapperClass(OrderMapper.class);

        job.setReducerClass(OrderReducer.class);

        // 4 设置map输出数据kv类型

        job.setMapOutputKeyClass(OrderBean.class);

        job.setMapOutputValueClass(NullWritable.class);

        // 5 设置最终输出数据的kv类型

        job.setOutputKeyClass(OrderBean.class);

        job.setOutputValueClass(NullWritable.class);

        // 6 设置输入数据和输出数据路径

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 8 设置reduce端的分组

        job.setGroupingComparatorClass(OrderGroupingComparator.class);

        // 7 提交job

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

OrderGroupingComparator

public class OrderGroupingComparator extends WritableComparator {

    protected OrderGroupingComparator() {

        super(OrderBean.class, true);

    }

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        //要求只要id相同,就认为是相同的key

        OrderBean aBean = (OrderBean) a;

        OrderBean bBean = (OrderBean) b;

        int result;

        if (aBean.getOrder_id() > bBean.getOrder_id()) {

            result = 1;

        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {

            result = -1;

        } else {

            result = 0;

        }

        return result;

    }

}

PS:如果要显示top3可修改Reducer

OrderReducer

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

    @Override

    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        //循环的时候设置循环次数为3即可

        for (NullWritable nullWritable : values) {

            context.write(key, NullWritable.get());

        }

    }

}

相关文章

网友评论

      本文标题:GroupingComparator分组案例实操

      本文链接:https://www.haomeiwen.com/subject/hfccsqtx.html